Source code for libpyhat.spectral_data

import copy

import numpy as np
import pandas as pd

import libpyhat.clustering.cluster as cluster
import libpyhat.emi.emi as endmember_identify
import libpyhat.transform.cal_tran as cal_tran
import libpyhat.transform.cal_tran_cv as cal_tran_cv
import libpyhat.transform.deriv as deriv
import libpyhat.transform.dim_red as dim_red
import libpyhat.transform.interp as interp
import libpyhat.transform.mask as mask
import libpyhat.transform.multiply_vector as multiply_vector
import libpyhat.transform.norm as norm
import libpyhat.transform.peak_area as peak_area
import libpyhat.transform.remove_baseline as remove_baseline
import libpyhat.transform.scale as scale
import libpyhat.transform.shift_spect as shift_spect
import libpyhat.utils.folds as folds
import libpyhat.utils.outlier_identify as outlier_identify
from libpyhat.Unmixing import unmix
from libpyhat.utils.lookup import lookup
from libpyhat.utils.utils import enumerate_duplicates
from libpyhat.utils.utils import remove_rows


[docs] class SpectralData(object): """This class is the native object used to store spectral data in PyHAT. Image cubes, point spectra, etc. will be translated into this object and this object will be passed around to PyHAT functionalities. Where necessary, those functionalities will translate the class into the necessary formats for scikit-learn and other packages or functions according to their respective API/interfacing requirements. Parameters: object : a pandas dataframe that has a particular multi-index structure Notes: The structure of the pandas dataframe required by this class is as follows: |meta|meta|...|wvl|wvl|...|comp|comp|... |metadata_category|another_metadata_category ...|wavelength_value|wavelength_value|...|composition_category |composition_category|...} 0 |val|val|...|val|val|...|val|val|... 1 |val|val|...|val|val|...|val|val|... ... N |val|val|...|val|val|...|val|val|... Metadata categories can be strings, floats, ints, and have no expected or enforced datatypes. However, common practice is that these categories are strings, "target_name", "latitude [degrees]", etc. An attempt will be made to convert all level-two header values to floats. This process is expected to fail for non-numerical strings, like most metadata and composition categories. However, wavelength values are expected to be ints or floats. Failure in this particular conversion, such as if special characters are included, e.g. '<125', for intensities at wavelengths less than 125 wavelength units, will result in an error. The rename column functionality in PyHAT can help the user address this after the class is instantiated. Composition category names can also be strings, floats, ints, and have no expected or enforced datatypes. However, common practice is that these categories are strings, such as "MnO [ppm]" and "Olivine [wt%]". Spectral intensities are expected to be numeric and an attempt to convert them to float will be made. Failure of this process will generate a warning message. This can happen when non-numeric value or non-numeric string was present, such as when the intensity is reads, '<12.5' or '~12.5'. The user can use class features to convert these intensities to numerical values. To-do: Introduce the class feature to convert composition or spectral intensity values to a numerical value of the user's choice, e.g ConvertLessThanToValue(data, value='0'). To-do: Introduce this functionality: The indexes in the first column can be provided by the user, but if missing, will be assigned. They will be enforced to start from 0 and count up to the number of spectra, N. To-do: Explicit handling of identical combinations of 1st and 2nd level columns, whether tuples or otherwise, e.g. ('meta','target_type') and ('meta', 'target_type') both being in the same dataset. To-do: We need to handle the case where the columns are not tuples, nor in the native format. Spectra datasets do not need *all* three expected top-level column headers ('wvl', 'meta', 'comp'), but one of them needs to be present. If none are present, an exception is thrown and interrupt class instantiation. If the user provides data for a column type that is not in the expected list, this data will be dropped. The user has the ability to set the required top-level columns (see __init__ args), however certain PyHAT functionalities expect the presence of certain columns. """ def __init__( self, df, name=None, meta_label="meta", spect_label="wvl", comp_label="comp", geodata=None, ): self.name = name self.geodata = geodata # this keyword lets us carry geodata info # along if we are working with an orbital cube self.meta_label = meta_label self.spect_label = spect_label self.comp_label = comp_label top_level_columns = [] if meta_label is not None: try: df[meta_label] top_level_columns.append(meta_label) except: print( "The specified metadata label (" + meta_label + ") was " "not " "found " "in the " "data " "frame!" ) print("Setting meta_label to None") self.meta_label = None if comp_label is not None: try: df[comp_label] top_level_columns.append(comp_label) except: print( "The specified composition label (" + comp_label + ") " "was " "not " "found in the data frame!" ) print("Setting comp_label to None") self.comp_label = None if spect_label is not None: try: df[spect_label] top_level_columns.append(spect_label) except: print( "The specified spectral data label (" + spect_label + ") " "was not found in the data frame!" ) print("Setting spect_label to None") self.spect_label = None # Check to make sure that at least one of the expected top level # columns # are present. If not, raise exception. if top_level_columns == []: raise Exception( "ERROR: The data frame does not contain columns with the " "labels " + meta_label + ", " + comp_label + ", or " + spect_label + "! Check your inputs and try again." ) # Attempt to get the top level and second level column names try: df.columns.levels[0] list(df.columns.levels[1].values) # If the columns are not multiindexes, then we will *assume* that # they are tuples that can be converted. Anything that isn't a tuple # will be removed. An example of this would be a dataset that looks # like this: # |('meta','target_name') | ('wvl',125.5) # 0 | 'Made_Up_Name' | 12345.3 except: print( "WARNING: The spectra dataset is not in PyHAT's native \ multi-index format. \ \nIt will be converted assuming column labels " "are tuples suitable for converstion to multi-index. \ \nPlease check that this has been done correctly" ) # Build list of tuples to drop to_drop = [] # Loop through the columns for i in range(len(df.columns)): # Check if the ith column is a tuple if not isinstance(df.columns[i], tuple): # If not, add the column to the drop list print( "WARNING: " + str(df.columns[i]) + " is not a tuple \ (this can be caused by duplicate column names). Removing " "this column." ) to_drop.append(df.columns[i]) # If the ith column is a tuple, check to see if the top-level # value is in the default list of top level columns elif df.columns[i][0] not in top_level_columns: print( "WARNING: You have provided data with a top-level \ column %s that does not " "match the specified " "top-level column: \ %s. *This data will be dropped.* You can either " "reformat\ your dataset or change the top-level column " "labels." % (df.columns[i][0], top_level_columns) ) to_drop.append(df.columns[i]) # Drop the list of columns that are not tuples df.drop(columns=to_drop, inplace=True) # Now we can generate the multiindex columns df.columns = pd.MultiIndex.from_tuples(list(df.columns)) new_columns = [] for col in df.columns: col = list(col) # Try to turn the second level column headers into floats if # possible, which is relevant # to wavelength values try: col[1] = float(col[1]) # This will generally fail for metadata categories since these are # usually non-numeric character strings there. except: if col[0] == self.spect_label: # If this fails for a wavelength column, let the user know. # This can cause issues with PyHAT analyses print( "WARNING: The wavelength value " + str(col[1]) + " failed to " "be converted to a float. The value will " "be kept as-is," " but this should be addressed by the user." ) new_columns.append(tuple(col)) # Set the columns to their formatted versions df.columns = pd.MultiIndex.from_tuples(new_columns) # Try to convert spectral intensities to float if self.spect_label is not None: try: df[spect_label] = df[spect_label].apply(pd.to_numeric, errors="raise") except: print( "WARNING: There are spectral intensities that are " "non-numeric. These have failed float conversion and " "could impact analysis." ) df[spect_label] = df[spect_label].apply(pd.to_numeric, errors="ignore") # store the df in the object self.df = df if self.spect_label is not None: self.get_wvls() else: self.wvls = None
[docs] def get_wvls(self): self.wvls = self.df[self.spect_label].columns.values
[docs] def cal_tran(self, A, B, dataAmatchcol, dataBmatchcol, params, Aname, Bname): self.df, self.ct_obj = cal_tran.call_cal_tran( A, B, self.df, dataAmatchcol, dataBmatchcol, params, spect_label=self.spect_label, dataAname=Aname, dataBname=Bname, dataCname=self.name, )
[docs] def cal_tran_cv(self, B, dataAmatchcol, dataBmatchcol, paramgrid, Bname): self.ct_cv_results = cal_tran_cv.call_cal_tran_cv( self.df, B, dataAmatchcol, dataBmatchcol, paramgrid, spect_label=self.spect_label, dataAname=self.name, dataBname=Bname, )
[docs] def cluster(self, col, method, params, kws): self.df = cluster.cluster(self.df, col, method=method, params=params, kws=kws)
[docs] def combine_spectral_data(self, data2): self.df[(self.meta_label, "Dataset")] = self.name data2.df[(self.meta_label, "Dataset")] = data2.name new_data = SpectralData( pd.concat([self.df, data2.df], ignore_index=True), meta_label=self.meta_label, spect_label=self.spect_label, comp_label=self.comp_label, ) return new_data
[docs] def copy_spectral_data(self, new_name): new_data = copy.deepcopy(self) new_data.name = new_name return new_data
[docs] def deriv(self): self.df = deriv.deriv(self.df, spect_label=self.spect_label)
[docs] def dim_red(self, col, method, params, kws, load_fit, ycol=None): self.df, self.do_dim_red = dim_red.dim_red( self.df, col, method, params, kws, load_fit=load_fit, ycol=ycol )
[docs] def interp(self, xnew): self.df = interp.interp(self.df, xnew, spect_label=self.spect_label)
[docs] def shift(self, shift): self.df = shift_spect.shift_spect(self.df, shift, spect_label=self.spect_label)
[docs] def mask(self, maskfile, maskvar): self.df = mask.mask(self.df, maskfile, maskvar=maskvar)
[docs] def multiply_vector(self, vectorfile): self.df = multiply_vector.multiply_vector( self.df, vectorfile=vectorfile, spect_label=self.spect_label )
[docs] def norm(self, ranges, col_var): self.df = norm.norm(self.df, ranges, col_var=col_var)
[docs] def outlier_identify(self, col, method, params): self.df = outlier_identify.outlier_identify( self.df, col=col, method=method, params=params )
[docs] def endmember_identify(self, col, method, n_endmembers): self.df, indices = endmember_identify.emi( self.df, col=col, emi_method=method, n_endmembers=n_endmembers )
[docs] def peak_area(self, peaks_mins_file): self.df, self.peaks, self.mins = peak_area.peak_area( self.df, peaks_mins_file=peaks_mins_file, spect_label=self.spect_label ) self.spect_label = "peak_area"
[docs] def random_folds(self, nfolds): self.df = folds.random(self.df, nfolds, meta_label=self.meta_label)
[docs] def remove_baseline(self, method, segment, params): self.df, self.df_baseline = remove_baseline.remove_baseline( self.df, method=method, segment=segment, params=params, spect_label=self.spect_label, )
[docs] def stratified_folds(self, nfolds, col, tiebreaker, comp_label="comp"): self.df = folds.stratified_folds( self.df, nfolds=nfolds, sortby=(comp_label, col), tiebreaker=(comp_label, tiebreaker), )
[docs] def enumerate_duplicates(self, col): self.df = enumerate_duplicates(self.df, col=col)
[docs] def scale(self, df_to_fit=None): self.df, self.scaler = scale.do_scale( self.df, df_to_fit, spect_label=self.spect_label )
[docs] def unmix(self, endmembers_df, method, params, normalize): endmembers = endmembers_df.iloc[ np.squeeze(np.array((endmembers_df["endmembers"] == 1))), : ] results = unmix.unmix( np.array(self.df[self.spect_label]), endmembers[self.spect_label], method, params=params, normalize=normalize, ) return results
[docs] def lookup(self, lookupdata, left_on, right_on): self.df = lookup( self.df, lookupdf=lookupdata, left_on=left_on, right_on=right_on )
[docs] def remove_rows(self, matching_values): self.df = remove_rows(self.df, matching_values, spect_label=self.spect_label)
[docs] def closest_wvl(self, input_wvls): wvls = self.df[self.spect_label].columns.values output_wvls = [] for w in input_wvls: idx = (np.abs(wvls - w)).argmin() output_wvls.append(wvls[idx]) return output_wvls
[docs] def remove_unnamed(self): # Handle unnamed columns from the input data by removing them colmask = self.df.columns.levels[0].str.match("Unnamed") if np.max(colmask) > 0: print("Removing unnamed columns:") print(self.df.columns.levels[0][colmask]) good_data = [] for c in self.df.columns.levels[0][~colmask]: data_tmp = self.df[c] data_tmp.columns = pd.MultiIndex.from_tuples( [(c, col) for col in data_tmp.columns.values] ) good_data.append(data_tmp) self.df = pd.concat(good_data, axis=1) else: pass
[docs] def remove_duplicates(self): try: # remove duplicate wvl values data_wvl = self.df[self.spect_label] data_no_wvl = self.df.drop(columns=self.spect_label) good_wvls = [] for i in data_wvl.columns: try: i = float(i) good_wvls.append(True) except: print("Removing column " + str(i)) good_wvls.append(False) data_wvl = data_wvl.iloc[:, good_wvls] data_wvl.columns = pd.MultiIndex.from_tuples( [(self.spect_label, float(i)) for i in data_wvl.columns] ) self.df = pd.merge(data_no_wvl, data_wvl, left_index=True, right_index=True) except: pass
""" def m3_params(self, paramname=None): if paramname is not None: if paramname == "R540": m3.r540(self) elif paramname == "R750": m3.r750(self) elif paramname == "R1580": m3.r1580(self) elif paramname == "R2780": m3.r2780(self) elif paramname == "VISNIR": m3.visnir(self) elif paramname == "R950_750": m3.r950_750(self) elif paramname == "2um_Ratio": m3.twoum_ratio(self) elif paramname == "Thermal_Ratio": m3.thermal_ratio(self) elif paramname == "Vis_Slope": m3.visslope(self) elif paramname == "1um_Slope": m3.oneum_slope(self) elif paramname == "2um_Slope": m3.twoum_slope(self) elif paramname == "BD620": m3.bd620(self) elif paramname == "BD950": m3.bd950(self) elif paramname == "BD1050": m3.bd1050(self) elif paramname == "BD1250": m3.bd1250(self) elif paramname == "BD3000": m3.bd3000(self) elif paramname == "BD1900": m3.bd1900(self) elif paramname == "BD2300": m3.bd2300(self) elif paramname == "BDI1000": m3.bdi1000(self) elif paramname == "BDI2000": m3.bdi2000(self) elif paramname == "OLINDEX": m3.olindex(self) elif paramname == "1um_min": m3.oneum_min(self) elif paramname == "1um_FWHM": m3.oneum_fwhm(self) elif paramname == "1um_symmetry": m3.oneum_sym(self) elif paramname == "BD1um_ratio": m3.bd1um_ratio(self) elif paramname == "BD2um_ratio": m3.bd2um_ratio(self) else: print(paramname + " is not recognized as a M3 summary " "parameter!") def crism_params(self, paramname=None): if paramname is not None: if paramname == "R440": crism.r440(self) elif paramname == "R530": crism.r530(self) elif paramname == "R600": crism.r600(self) elif paramname == "R770": crism.r770(self) elif paramname == "R1080": crism.r1080(self) elif paramname == "R1300": crism.r1300(self) elif paramname == "R1330": crism.r1330(self) elif paramname == "R1506": crism.r1506(self) elif paramname == "R2529": crism.r2529(self) elif paramname == "R3920": crism.r3920(self) elif paramname == "Red/Blue Ratio": crism.rbr(self) elif paramname == "BD530": crism.bd530_2(self) elif paramname == "BD640": crism.bd640_2(self) elif paramname == "BD860": crism.bd860_2(self) elif paramname == "BD920": crism.bd920_2(self) elif paramname == "BD1300": crism.bd1300(self) elif paramname == "BD1400": crism.bd1400(self) elif paramname == "BD1435": crism.bd1435(self) elif paramname == "BD1500": crism.bd1500_2(self) elif paramname == "BD1750": crism.bd1750_2(self) elif paramname == "BD1900": crism.bd1900_2(self) elif paramname == "BD1900r2": crism.bd1900r2(self) elif paramname == "BD2190": crism.bd2190(self) elif paramname == "BD2190": crism.bd2190(self) elif paramname == "BD2100": crism.bd2100_2(self) elif paramname == "BD2165": crism.bd2165(self) elif paramname == "BD2210": crism.bd2210_2(self) elif paramname == "BD2230": crism.bd2230(self) elif paramname == "BD2250": crism.bd2250(self) elif paramname == "BD2265": crism.bd2265(self) elif paramname == "BD2290": crism.bd2290(self) elif paramname == "BD2355": crism.bd2355(self) elif paramname == "BD2500h": crism.bd2500h_2(self) elif paramname == "BD2600": crism.bd2600(self) elif paramname == "BD3000": crism.crism_bd3000(self) elif paramname == "BD3100": crism.bd3100(self) elif paramname == "BD3200": crism.bd3200(self) elif paramname == "BD3400": crism.bd3400_2(self) elif paramname == "BDI1000VIS": crism.bdi1000VIS(self) elif paramname == "BDI1000IR": crism.bdi1000IR(self) elif paramname == "BDI2000": crism.crism_bdi2000(self) elif paramname == "SH600": crism.sh600_2(self) elif paramname == "SH770": crism.sh770(self) elif paramname == "SINDEX2": crism.sindex2(self) elif paramname == "CINDEX2": crism.cindex2(self) elif paramname == "RPEAK1": crism.rpeak1(self) elif paramname == "OLINDEX3": crism.olivine_index3(self) elif paramname == "LCPINDEX2": crism.lcp_index2(self) elif paramname == "HCPINDEX2": crism.hcp_index2(self) elif paramname == "ISLOPE1": crism.islope1(self) elif paramname == "ICER1_2": crism.icer1_2(self) elif paramname == "DOUB2200H": crism.doub2200h(self) elif paramname == "MIN2200": crism.min2200(self) elif paramname == "D2200": crism.d2200(self) elif paramname == "MIN2250": crism.min2250(self) elif paramname == "D2300": crism.d2300(self) elif paramname == "MIN2295_2480": crism.min2295_2480(self) elif paramname == "MIN2345_2537": crism.min2345_2537(self) elif paramname == "IRR1": crism.irr1(self) elif paramname == "IRR2": crism.irr2(self) elif paramname == "IRR3": crism.irr3(self) else: print(paramname + " is not recognized as a CRISM summary " "parameter!") """ # sometimes (e.g. when loading data) we end up with spectra containing # nans. # This removes any spectrum with a NaN
[docs] def remove_empty_spectra(self): nan_mask = self.df[self.spect_label].isna().any(axis=1) if np.max(nan_mask) is True: print( str(np.sum(nan_mask)) + " spectra containing NaNs identified! These " "will be removed." ) self.df = self.df.iloc[np.array(~nan_mask), :]