Source code for libpyhat.transform.cal_tran
import numpy as np
import pandas as pd
from numpy.linalg import norm
from scipy.linalg import cho_factor, cho_solve, svdvals
from sklearn.cross_decomposition import CCA, PLSRegression
from sklearn.preprocessing import normalize
import libpyhat.transform.caltran_utils as ctu
# apply calibration transfer to a data set to transform it to match a
# different data set.
[docs]
class cal_tran:
    def __init__(self, params):
        self.algorithm_list = [
            "None",
            "PDS - Piecewise DS",
            "DS - Direct Standardization",
            "LASSO DS",
            "Ratio",
            "Ridge DS",
            "Sparse Low Rank DS",
            "CCA - Canonical Correlation Analysis",
            "New CCA",
            "Incremental Proximal Descent DS",
            "Forward Backward DS",
        ]
        self.method = params.pop("method")
        self.ct_obj = None
        if self.method == "PDS - Piecewise DS":
            self.ct_obj = piecewise_ds(**params)
        if self.method == "None":
            self.ct_obj = no_transform()
        if self.method == "DS - Direct Standardization":
            self.ct_obj = ds(**params)
        if self.method == "LASSO DS":
            self.ct_obj = admm_ds(**params)
        if self.method == "Ridge DS":
            self.ct_obj = admm_ds(**params)
        if self.method == "Sparse Low Rank DS":
            self.ct_obj = admm_ds(**params)
        if self.method == "CCA - Canonical Correlation Analysis":
            self.ct_obj = cca(**params)
        if self.method == "New CCA":
            self.ct_obj = cca(**params)
        if self.method == "Incremental Proximal Descent DS":
            self.ct_obj = ipd_ds(**params)
        if self.method == "Forward Backward DS":
            self.ct_obj = forward_backward_ds(**params)
        if self.method == "Ratio":
            self.ct_obj = ratio()
        if self.method == "PDS-PLS - PDS using Partial Least Squares":
            self.ct_obj = piecewise_ds(**params)
        if self.ct_obj is None:
            print("Method " + self.method + " not recognized!")
[docs]
    def save_transform(self, outfile, wvls):
        print("Saving transform to " + outfile)
        if self.method == "Ratio":
            self.ct_obj.ratio_vect.to_csv(outfile)
        elif self.method == "PDS - Piecewise DS":
            if self.ct_obj.win_size == 1:
                print(
                    "Window size is 1. Saving the diagonal of the transform "
                    "matrix only."
                )
                out_vect = self.ct_obj.proj_to_B.diagonal()
                out_vect = pd.DataFrame(out_vect, index=wvls)
                out_vect.to_csv(outfile)
            else:
                print("Saving transform matrix (this can be large!)")
                tmp = pd.DataFrame(self.ct_obj.proj_to_B, columns=wvls, index=wvls)
                tmp.to_csv(outfile)
        else:
            print("Save transform not yet implemented for " + self.method)
[docs]
class ratio:
[docs]
    def derive_transform(self, A, B):
        A_mean = np.mean(A, axis=0)
        B_mean = np.mean(B, axis=0)
        self.ratio_vect = B_mean / A_mean
[docs]
    def apply_transform(self, C):
        C_transformed = np.multiply(C, self.ratio_vect)
        return C_transformed
[docs]
class piecewise_ds:
    def __init__(self, win_size=5, pls=False, nc=5):
        self.pls = pls
        self.win_size = win_size
        self.pls_nc = nc
        assert win_size % 2 == 1, "Window size must be odd."
        self.padding = (win_size - 1) / 2
[docs]
    def derive_transform(self, A, B):
        A = np.array(A, dtype="float")
        B = np.array(B, dtype="float")
        # TODO: Use exception handling instead.
        assert A.shape == B.shape, "Input matrices must be the same shape."
        self.B_shape = B.shape
        n_feats = B.shape[1]
        coefs = []
        for i in range(n_feats):
            row = np.zeros(n_feats)
            start = int(max(i - self.padding, 0))
            end = int(min(i + self.padding, n_feats - 1) + 1)
            if self.pls:
                model = PLSRegression(n_components=self.pls_nc, scale=False)
                # TODO: Swap with exception handling.
                try:
                    model.fit(A[:, start:end], B[:, i])
                    row[start:end] = model.coef_.ravel()
                except:
                    pass
            else:
                row[start:end] = np.dot(np.linalg.pinv(A[:, start:end]), B[:, i])
            coefs.append(row)
        self.proj_to_B = np.array(coefs).T
[docs]
class ds:
    def __init__(self, fit_intercept=False):
        self.fit_intercept = fit_intercept
[docs]
    def get_working_data(self, data):
        data = np.array(data, dtype=float)
        if len(data.shape) == 1:
            data = np.reshape(data, (1, data.shape[0]))
        if self.fit_intercept:
            working = np.hstack((data, np.ones((data.shape[0], 1))))
        else:
            working = np.copy(data)
        return working
[docs]
    def derive_transform(self, A, B):
        assert (
            A.shape[0] == B.shape[0]
        ), "Input matrices must have the same number of rows (i.e. samples)."
        working_A = self.get_working_data(A)
        self.proj_to_B = np.dot(np.linalg.pinv(working_A), B)
[docs]
    def apply_transform(self, C):
        working_C = self.get_working_data(C)
        C_transformed = np.dot(working_C, self.proj_to_B)
        return C_transformed
[docs]
class admm_ds:
    def __init__(
        self, rho=1, beta=0.2, epsilon=1e-5, max_iter=100, verbose=True, reg="lasso"
    ):
        self.rho = rho
        self.beta = beta
        self.epsilon = epsilon
        self.max_iter = max_iter
        self.verbose = verbose
        self.reg = reg
[docs]
    def L2_norm(self, data):
        if len(data.shape) > 1:
            w = np.array(np.sqrt(np.sum(np.square(data), axis=1)))
            data_normed = data / w[:, None]
        else:
            w = np.array(np.sqrt(np.sum(np.square(data), axis=0)))
            data_normed = data / w
        return data_normed, w
[docs]
    def undo_L2_norm(self, data_normed, w):
        if len(data_normed.shape) > 1:
            data = data_normed * w[:, None]
        else:
            data = data_normed * w
        return data
[docs]
    def derive_transform(self, A, B):
        n = A.shape[1]
        P = np.zeros((n, n))
        Z = P.copy()
        Y = P.copy()
        A, Aw = self.L2_norm(A)
        B, Bw = self.L2_norm(B)
        AtA = np.dot(A.T, A)
        AtB = np.dot(A.T, B)
        if self.reg == "fused":
            iden = np.identity(n - 1)
            pos = np.hstack((iden, np.zeros((len(iden), 1))))
            neg = -1 * np.roll(pos, 1, 1)
            D = np.vstack(((pos + neg), np.zeros((1, len(iden) + 1))))
            P_fact = cho_factor(AtA + self.rho * np.dot(D.T, D))
        else:
            P_fact = cho_factor(AtA + self.rho * np.eye(n))
        if self.reg == "ridge":
            Z_fact = cho_factor(2 * np.eye(n) + self.rho * np.eye(n))
        for it in range(self.max_iter):
            last_P = P
            last_Z = Z
            if self.reg == "fused":
                Z = ctu.soft_thresh(np.dot(D, P) + (Y / self.rho), self.beta / self.rho)
            elif self.reg == "rank":
                Z = ctu.svt_thresh(P + (Y / self.rho), self.beta / self.rho)
            elif self.reg == "ridge":
                Z = cho_solve(Z_fact, self.rho * P + Y)
            elif self.reg == "sp_lr":
                Z = (
                    ctu.soft_thresh(P + (Y / self.rho), self.beta / self.rho)
                    + ctu.svt_thresh(P + (Y / self.rho), self.beta / self.rho)
                ) / 2.0
            elif self.reg == "lasso":
                Z = ctu.soft_thresh(P + (Y / self.rho), self.beta / self.rho)
            else:
                self.proj_to_B = None
                return "ERROR: Regularizer not programmed."
            if self.reg == "fused":
                P = cho_solve(P_fact, AtB + self.rho * np.dot(D.T, Z) - np.dot(D.T, Y))
                Y += self.rho * (np.dot(D, P) - Z)
            else:
                P = cho_solve(P_fact, AtB + self.rho * Z - Y)
                Y += self.rho * (P - Z)
            P_conv = norm(P - last_P) / norm(P)
            Z_conv = norm(Z - last_Z) / norm(Z)
            if self.verbose:
                # num_zero = np.count_nonzero(P<=1e-9)
                if self.reg == "fused":
                    print(
                        it,
                        P_conv,
                        Z_conv,
                        norm(np.dot(D, P) - Z),
                        np.count_nonzero(Z),
                        sum(svdvals(Z)),
                    )
                    print("score: %.4f" % (norm(np.dot(D, P) - Z) + norm(A - B.dot(Z))))
                else:
                    print(
                        it, P_conv, Z_conv, norm(P - Z), np.count_nonzero(Z), norm(Z, 1)
                    )  # ,sum(sp.linalg.svdvals(Z)))
                    print("score: %.4f" % (norm(P - Z) + norm(A - B.dot(Z))))
            if P_conv <= self.epsilon and Z_conv <= self.epsilon:
                break
        else:
            print("Didn't converge in %d steps" % self.max_iter)
        self.proj_to_B = P
[docs]
    def apply_transform(self, C):
        C, Cw = self.L2_norm(C)
        C_proj_to_B = np.dot(C, self.proj_to_B)
        C_proj_to_B = self.undo_L2_norm(C_proj_to_B, Cw)
        return C_proj_to_B
[docs]
class cca:
    def __init__(self, n_components=1, ccatype=None):
        self.n_components = n_components
        self.ccatype = ccatype
[docs]
    def derive_transform(self, A, B):
        self.model = CCA(n_components=self.n_components, scale=False).fit(A, B)
        if self.ccatype == "new":
            x_scores, y_scores = self.model.transform(A, B)
            # http://onlinelibrary.wiley.com/doi/10.1002/cem.2637/abstract
            F1 = np.linalg.pinv(x_scores).dot(y_scores)
            F2 = np.linalg.pinv(y_scores).dot(B)
            P = ctu.multi_dot((self.model.x_weights_, F1, F2))
            self.proj_to_B = P
        else:
            return self.model
[docs]
    def apply_transform(self, C):
        C = np.array(C)
        if self.ccatype == "new":
            return C.dot(self.proj_to_B)
        else:
            if len(C.shape) == 1:
                C = C.reshape(1, -1)
            return self.model.predict(C)
[docs]
class ipd_ds:
    def __init__(
        self, t=0.0002, svt=10, l1=10, epsilon=1e-5, max_iter=50, verbose=True
    ):
        self.t = t
        self.svt = svt
        self.l1 = l1
        self.epsilon = epsilon
        self.max_iter = max_iter
        self.verbose = verbose
[docs]
    def derive_transform(self, A, B):
        # incremental proximal descent, Bertsekas 2010
        P = np.eye(A.shape[1])
        # P = np.zeros(B.shape[1])
        A = normalize(A, axis=1)
        B = normalize(B, axis=1)
        AtA = np.dot(A.T, A)
        AtB = np.dot(A.T, B)
        for it in range(self.max_iter):
            last_P = P.copy()
            P = P - self.t * (np.dot(AtA, P) - AtB)
            P = ctu.svt_thresh(P, self.svt * self.t)
            P = ctu.soft_thresh(P, self.l1 * self.t)
            P_conv = norm(P - last_P) / norm(P)
            if self.verbose:
                # svdsum = sum(sp.linalg.svdvals(P))
                # print(it, P_conv, norm(A-B.dot(P)), norm(P,1), svdsum)
                print(it, P_conv, norm(B - A.dot(P)), norm(P, 1))
                # print("score: %.4f" % (norm(A-B.dot(P))+norm(P,1)+svdsum))
            if P_conv <= self.epsilon:
                break
        else:
            print("Didn't converge in %d steps" % self.max_iter)
        self.proj_to_B = P
[docs]
class forward_backward_ds:
    def __init__(self, t=0.001, svt=1, l1=1, epsilon=1e-5, max_iter=20, verbose=True):
        self.t = t
        self.svt = svt
        self.l1 = l1
        self.epsilon = epsilon
        self.max_iter = max_iter
        self.verbose = verbose
[docs]
    def derive_transform(self, A, B):
        P = np.zeros(B.shape[1])
        Z1 = P.copy()
        Z2 = P.copy()
        A = normalize(A, axis=1)
        B = normalize(B, axis=1)
        AtA = np.dot(A.T, A)
        AtB = np.dot(A.T, B)
        for it in range(self.max_iter):
            last_P = P.copy()
            G = np.dot(AtA, P) - AtB
            Z1 = ctu.svt_thresh(2 * P - Z1 - self.t * G, 2 * self.svt * self.t)
            Z2 = ctu.soft_thresh(2 * P - Z2 - self.t * G, 2 * self.l1 * self.t)
            P = (Z1 + Z2) / 2.0
            P_conv = norm(P - last_P) / norm(P)
            if self.verbose:
                print(it, P_conv)
            if P_conv <= self.epsilon:
                break
        else:
            print("Didn't converge in %d steps" % self.max_iter)
        self.proj_to_B = P
[docs]
def call_cal_tran(
    A,
    B,
    C,
    dataAmatchcol,
    dataBmatchcol,
    params,
    spect_label="wvl",
    dataAname="A",
    dataBname="B",
    dataCname="C",
):
    # A, B, and C are data frames. The transformed is fit to align A to B,
    # then applied to C.
    print(
        "Running calibration transfer. The transformation will be fit to "
        "make " + dataAname + " match " + dataBname
    )
    print("Transformation will then be applied to " + dataCname)
    ctu.check_data(A, C, dataAname, dataBname, spect_label=spect_label)
    ctu.check_data(B, C, dataBname, dataCname, spect_label=spect_label)
    A_mean, B_mean = ctu.prepare_data(A, B, dataAmatchcol, dataBmatchcol)
    ct_obj = cal_tran(params)
    ct_obj.derive_transform(A_mean[spect_label].values, B_mean[spect_label].values)
    C[spect_label] = ct_obj.apply_transform(C[spect_label])
    return C, ct_obj