Source code for libpyhat.transform.cal_tran
import numpy as np
import pandas as pd
from numpy.linalg import norm
from scipy.linalg import cho_factor, cho_solve, svdvals
from sklearn.cross_decomposition import CCA, PLSRegression
from sklearn.preprocessing import normalize
import libpyhat.transform.caltran_utils as ctu
# apply calibration transfer to a data set to transform it to match a
# different data set.
[docs]
class cal_tran:
def __init__(self, params):
self.algorithm_list = [
"None",
"PDS - Piecewise DS",
"DS - Direct Standardization",
"LASSO DS",
"Ratio",
"Ridge DS",
"Sparse Low Rank DS",
"CCA - Canonical Correlation Analysis",
"New CCA",
"Incremental Proximal Descent DS",
"Forward Backward DS",
]
self.method = params.pop("method")
self.ct_obj = None
if self.method == "PDS - Piecewise DS":
self.ct_obj = piecewise_ds(**params)
if self.method == "None":
self.ct_obj = no_transform()
if self.method == "DS - Direct Standardization":
self.ct_obj = ds(**params)
if self.method == "LASSO DS":
self.ct_obj = admm_ds(**params)
if self.method == "Ridge DS":
self.ct_obj = admm_ds(**params)
if self.method == "Sparse Low Rank DS":
self.ct_obj = admm_ds(**params)
if self.method == "CCA - Canonical Correlation Analysis":
self.ct_obj = cca(**params)
if self.method == "New CCA":
self.ct_obj = cca(**params)
if self.method == "Incremental Proximal Descent DS":
self.ct_obj = ipd_ds(**params)
if self.method == "Forward Backward DS":
self.ct_obj = forward_backward_ds(**params)
if self.method == "Ratio":
self.ct_obj = ratio()
if self.method == "PDS-PLS - PDS using Partial Least Squares":
self.ct_obj = piecewise_ds(**params)
if self.ct_obj is None:
print("Method " + self.method + " not recognized!")
[docs]
def save_transform(self, outfile, wvls):
print("Saving transform to " + outfile)
if self.method == "Ratio":
self.ct_obj.ratio_vect.to_csv(outfile)
elif self.method == "PDS - Piecewise DS":
if self.ct_obj.win_size == 1:
print(
"Window size is 1. Saving the diagonal of the transform "
"matrix only."
)
out_vect = self.ct_obj.proj_to_B.diagonal()
out_vect = pd.DataFrame(out_vect, index=wvls)
out_vect.to_csv(outfile)
else:
print("Saving transform matrix (this can be large!)")
tmp = pd.DataFrame(self.ct_obj.proj_to_B, columns=wvls, index=wvls)
tmp.to_csv(outfile)
else:
print("Save transform not yet implemented for " + self.method)
[docs]
class ratio:
[docs]
def derive_transform(self, A, B):
A_mean = np.mean(A, axis=0)
B_mean = np.mean(B, axis=0)
self.ratio_vect = B_mean / A_mean
[docs]
def apply_transform(self, C):
C_transformed = np.multiply(C, self.ratio_vect)
return C_transformed
[docs]
class piecewise_ds:
def __init__(self, win_size=5, pls=False, nc=5):
self.pls = pls
self.win_size = win_size
self.pls_nc = nc
assert win_size % 2 == 1, "Window size must be odd."
self.padding = (win_size - 1) / 2
[docs]
def derive_transform(self, A, B):
A = np.array(A, dtype="float")
B = np.array(B, dtype="float")
# TODO: Use exception handling instead.
assert A.shape == B.shape, "Input matrices must be the same shape."
self.B_shape = B.shape
n_feats = B.shape[1]
coefs = []
for i in range(n_feats):
row = np.zeros(n_feats)
start = int(max(i - self.padding, 0))
end = int(min(i + self.padding, n_feats - 1) + 1)
if self.pls:
model = PLSRegression(n_components=self.pls_nc, scale=False)
# TODO: Swap with exception handling.
try:
model.fit(A[:, start:end], B[:, i])
row[start:end] = model.coef_.ravel()
except:
pass
else:
row[start:end] = np.dot(np.linalg.pinv(A[:, start:end]), B[:, i])
coefs.append(row)
self.proj_to_B = np.array(coefs).T
[docs]
class ds:
def __init__(self, fit_intercept=False):
self.fit_intercept = fit_intercept
[docs]
def get_working_data(self, data):
data = np.array(data, dtype=float)
if len(data.shape) == 1:
data = np.reshape(data, (1, data.shape[0]))
if self.fit_intercept:
working = np.hstack((data, np.ones((data.shape[0], 1))))
else:
working = np.copy(data)
return working
[docs]
def derive_transform(self, A, B):
assert (
A.shape[0] == B.shape[0]
), "Input matrices must have the same number of rows (i.e. samples)."
working_A = self.get_working_data(A)
self.proj_to_B = np.dot(np.linalg.pinv(working_A), B)
[docs]
def apply_transform(self, C):
working_C = self.get_working_data(C)
C_transformed = np.dot(working_C, self.proj_to_B)
return C_transformed
[docs]
class admm_ds:
def __init__(
self, rho=1, beta=0.2, epsilon=1e-5, max_iter=100, verbose=True, reg="lasso"
):
self.rho = rho
self.beta = beta
self.epsilon = epsilon
self.max_iter = max_iter
self.verbose = verbose
self.reg = reg
[docs]
def L2_norm(self, data):
if len(data.shape) > 1:
w = np.array(np.sqrt(np.sum(np.square(data), axis=1)))
data_normed = data / w[:, None]
else:
w = np.array(np.sqrt(np.sum(np.square(data), axis=0)))
data_normed = data / w
return data_normed, w
[docs]
def undo_L2_norm(self, data_normed, w):
if len(data_normed.shape) > 1:
data = data_normed * w[:, None]
else:
data = data_normed * w
return data
[docs]
def derive_transform(self, A, B):
n = A.shape[1]
P = np.zeros((n, n))
Z = P.copy()
Y = P.copy()
A, Aw = self.L2_norm(A)
B, Bw = self.L2_norm(B)
AtA = np.dot(A.T, A)
AtB = np.dot(A.T, B)
if self.reg == "fused":
iden = np.identity(n - 1)
pos = np.hstack((iden, np.zeros((len(iden), 1))))
neg = -1 * np.roll(pos, 1, 1)
D = np.vstack(((pos + neg), np.zeros((1, len(iden) + 1))))
P_fact = cho_factor(AtA + self.rho * np.dot(D.T, D))
else:
P_fact = cho_factor(AtA + self.rho * np.eye(n))
if self.reg == "ridge":
Z_fact = cho_factor(2 * np.eye(n) + self.rho * np.eye(n))
for it in range(self.max_iter):
last_P = P
last_Z = Z
if self.reg == "fused":
Z = ctu.soft_thresh(np.dot(D, P) + (Y / self.rho), self.beta / self.rho)
elif self.reg == "rank":
Z = ctu.svt_thresh(P + (Y / self.rho), self.beta / self.rho)
elif self.reg == "ridge":
Z = cho_solve(Z_fact, self.rho * P + Y)
elif self.reg == "sp_lr":
Z = (
ctu.soft_thresh(P + (Y / self.rho), self.beta / self.rho)
+ ctu.svt_thresh(P + (Y / self.rho), self.beta / self.rho)
) / 2.0
elif self.reg == "lasso":
Z = ctu.soft_thresh(P + (Y / self.rho), self.beta / self.rho)
else:
self.proj_to_B = None
return "ERROR: Regularizer not programmed."
if self.reg == "fused":
P = cho_solve(P_fact, AtB + self.rho * np.dot(D.T, Z) - np.dot(D.T, Y))
Y += self.rho * (np.dot(D, P) - Z)
else:
P = cho_solve(P_fact, AtB + self.rho * Z - Y)
Y += self.rho * (P - Z)
P_conv = norm(P - last_P) / norm(P)
Z_conv = norm(Z - last_Z) / norm(Z)
if self.verbose:
# num_zero = np.count_nonzero(P<=1e-9)
if self.reg == "fused":
print(
it,
P_conv,
Z_conv,
norm(np.dot(D, P) - Z),
np.count_nonzero(Z),
sum(svdvals(Z)),
)
print("score: %.4f" % (norm(np.dot(D, P) - Z) + norm(A - B.dot(Z))))
else:
print(
it, P_conv, Z_conv, norm(P - Z), np.count_nonzero(Z), norm(Z, 1)
) # ,sum(sp.linalg.svdvals(Z)))
print("score: %.4f" % (norm(P - Z) + norm(A - B.dot(Z))))
if P_conv <= self.epsilon and Z_conv <= self.epsilon:
break
else:
print("Didn't converge in %d steps" % self.max_iter)
self.proj_to_B = P
[docs]
def apply_transform(self, C):
C, Cw = self.L2_norm(C)
C_proj_to_B = np.dot(C, self.proj_to_B)
C_proj_to_B = self.undo_L2_norm(C_proj_to_B, Cw)
return C_proj_to_B
[docs]
class cca:
def __init__(self, n_components=1, ccatype=None):
self.n_components = n_components
self.ccatype = ccatype
[docs]
def derive_transform(self, A, B):
self.model = CCA(n_components=self.n_components, scale=False).fit(A, B)
if self.ccatype == "new":
x_scores, y_scores = self.model.transform(A, B)
# http://onlinelibrary.wiley.com/doi/10.1002/cem.2637/abstract
F1 = np.linalg.pinv(x_scores).dot(y_scores)
F2 = np.linalg.pinv(y_scores).dot(B)
P = ctu.multi_dot((self.model.x_weights_, F1, F2))
self.proj_to_B = P
else:
return self.model
[docs]
def apply_transform(self, C):
C = np.array(C)
if self.ccatype == "new":
return C.dot(self.proj_to_B)
else:
if len(C.shape) == 1:
C = C.reshape(1, -1)
return self.model.predict(C)
[docs]
class ipd_ds:
def __init__(
self, t=0.0002, svt=10, l1=10, epsilon=1e-5, max_iter=50, verbose=True
):
self.t = t
self.svt = svt
self.l1 = l1
self.epsilon = epsilon
self.max_iter = max_iter
self.verbose = verbose
[docs]
def derive_transform(self, A, B):
# incremental proximal descent, Bertsekas 2010
P = np.eye(A.shape[1])
# P = np.zeros(B.shape[1])
A = normalize(A, axis=1)
B = normalize(B, axis=1)
AtA = np.dot(A.T, A)
AtB = np.dot(A.T, B)
for it in range(self.max_iter):
last_P = P.copy()
P = P - self.t * (np.dot(AtA, P) - AtB)
P = ctu.svt_thresh(P, self.svt * self.t)
P = ctu.soft_thresh(P, self.l1 * self.t)
P_conv = norm(P - last_P) / norm(P)
if self.verbose:
# svdsum = sum(sp.linalg.svdvals(P))
# print(it, P_conv, norm(A-B.dot(P)), norm(P,1), svdsum)
print(it, P_conv, norm(B - A.dot(P)), norm(P, 1))
# print("score: %.4f" % (norm(A-B.dot(P))+norm(P,1)+svdsum))
if P_conv <= self.epsilon:
break
else:
print("Didn't converge in %d steps" % self.max_iter)
self.proj_to_B = P
[docs]
class forward_backward_ds:
def __init__(self, t=0.001, svt=1, l1=1, epsilon=1e-5, max_iter=20, verbose=True):
self.t = t
self.svt = svt
self.l1 = l1
self.epsilon = epsilon
self.max_iter = max_iter
self.verbose = verbose
[docs]
def derive_transform(self, A, B):
P = np.zeros(B.shape[1])
Z1 = P.copy()
Z2 = P.copy()
A = normalize(A, axis=1)
B = normalize(B, axis=1)
AtA = np.dot(A.T, A)
AtB = np.dot(A.T, B)
for it in range(self.max_iter):
last_P = P.copy()
G = np.dot(AtA, P) - AtB
Z1 = ctu.svt_thresh(2 * P - Z1 - self.t * G, 2 * self.svt * self.t)
Z2 = ctu.soft_thresh(2 * P - Z2 - self.t * G, 2 * self.l1 * self.t)
P = (Z1 + Z2) / 2.0
P_conv = norm(P - last_P) / norm(P)
if self.verbose:
print(it, P_conv)
if P_conv <= self.epsilon:
break
else:
print("Didn't converge in %d steps" % self.max_iter)
self.proj_to_B = P
[docs]
def call_cal_tran(
A,
B,
C,
dataAmatchcol,
dataBmatchcol,
params,
spect_label="wvl",
dataAname="A",
dataBname="B",
dataCname="C",
):
# A, B, and C are data frames. The transformed is fit to align A to B,
# then applied to C.
print(
"Running calibration transfer. The transformation will be fit to "
"make " + dataAname + " match " + dataBname
)
print("Transformation will then be applied to " + dataCname)
ctu.check_data(A, C, dataAname, dataBname, spect_label=spect_label)
ctu.check_data(B, C, dataBname, dataCname, spect_label=spect_label)
A_mean, B_mean = ctu.prepare_data(A, B, dataAmatchcol, dataBmatchcol)
ct_obj = cal_tran(params)
ct_obj.derive_transform(A_mean[spect_label].values, B_mean[spect_label].values)
C[spect_label] = ct_obj.apply_transform(C[spect_label])
return C, ct_obj