import copy
import numpy as np
import pandas as pd
import libpyhat.clustering.cluster as cluster
import libpyhat.emi.emi as endmember_identify
import libpyhat.transform.cal_tran as cal_tran
import libpyhat.transform.cal_tran_cv as cal_tran_cv
import libpyhat.transform.deriv as deriv
import libpyhat.transform.dim_red as dim_red
import libpyhat.transform.interp as interp
import libpyhat.transform.mask as mask
import libpyhat.transform.multiply_vector as multiply_vector
import libpyhat.transform.norm as norm
import libpyhat.transform.peak_area as peak_area
import libpyhat.transform.remove_baseline as remove_baseline
import libpyhat.transform.scale as scale
import libpyhat.transform.shift_spect as shift_spect
import libpyhat.utils.folds as folds
import libpyhat.utils.outlier_identify as outlier_identify
from libpyhat.Unmixing import unmix
from libpyhat.utils.lookup import lookup
from libpyhat.utils.utils import enumerate_duplicates
from libpyhat.utils.utils import remove_rows
[docs]
class SpectralData(object):
"""This class is the native object used to store spectral data in PyHAT.
Image cubes, point spectra, etc. will be translated into this object and
this object will be passed around to PyHAT functionalities. Where
necessary,
those functionalities will translate the class into the necessary formats
for scikit-learn and other packages or functions according to their
respective API/interfacing requirements.
Parameters:
object : a pandas dataframe that has a particular multi-index structure
Notes: The structure of the pandas dataframe required by this class is
as follows:
|meta|meta|...|wvl|wvl|...|comp|comp|...
|metadata_category|another_metadata_category
...|wavelength_value|wavelength_value|...|composition_category
|composition_category|...}
0 |val|val|...|val|val|...|val|val|...
1 |val|val|...|val|val|...|val|val|...
...
N |val|val|...|val|val|...|val|val|...
Metadata categories can be strings, floats, ints, and have no expected
or enforced datatypes. However, common practice is that these categories
are strings, "target_name", "latitude [degrees]", etc.
An attempt will be made to convert all level-two header values to
floats. This
process is expected to fail for non-numerical strings, like most metadata
and composition categories. However, wavelength values are expected to
be ints or
floats. Failure in this particular conversion, such as if special
characters are
included, e.g. '<125', for intensities at wavelengths less than 125
wavelength
units, will result in an error. The rename column functionality in
PyHAT can
help the user address this after the class is instantiated.
Composition category names can also be strings, floats, ints, and have no
expected or enforced datatypes. However, common practice is that these
categories are strings, such as "MnO [ppm]" and "Olivine [wt%]".
Spectral intensities are expected to be numeric and an attempt to
convert them
to float will be made. Failure of this process will generate a warning
message.
This can happen when non-numeric value or non-numeric string was
present, such
as when the intensity is reads, '<12.5' or '~12.5'. The user can use
class features
to convert these intensities to numerical values.
To-do:
Introduce the class feature to convert composition or spectral
intensity values
to a numerical value of the user's choice,
e.g ConvertLessThanToValue(data, value='0').
To-do:
Introduce this functionality: The indexes in the first column can be
provided
by the user, but if missing, will be assigned. They will be enforced
to start
from 0 and count up to the number of spectra, N.
To-do:
Explicit handling of identical combinations of 1st and 2nd level
columns, whether
tuples or otherwise, e.g. ('meta','target_type') and ('meta',
'target_type') both
being in the same dataset.
To-do: We need to handle the case where the columns are not tuples,
nor in the
native format.
Spectra datasets do not need *all* three expected top-level column
headers
('wvl', 'meta', 'comp'), but one of them needs to be present. If none
are present,
an exception is thrown and interrupt class instantiation.
If the user provides data for a column type that is not in the
expected list,
this data will be dropped.
The user has the ability to set the required top-level columns (see
__init__ args),
however certain PyHAT functionalities expect the presence of certain
columns.
"""
def __init__(
self,
df,
name=None,
meta_label="meta",
spect_label="wvl",
comp_label="comp",
geodata=None,
):
self.name = name
self.geodata = geodata # this keyword lets us carry geodata info
# along if we are working with an orbital cube
self.meta_label = meta_label
self.spect_label = spect_label
self.comp_label = comp_label
top_level_columns = []
if meta_label is not None:
try:
df[meta_label]
top_level_columns.append(meta_label)
except:
print(
"The specified metadata label (" + meta_label + ") was "
"not "
"found "
"in the "
"data "
"frame!"
)
print("Setting meta_label to None")
self.meta_label = None
if comp_label is not None:
try:
df[comp_label]
top_level_columns.append(comp_label)
except:
print(
"The specified composition label (" + comp_label + ") "
"was "
"not "
"found in the data frame!"
)
print("Setting comp_label to None")
self.comp_label = None
if spect_label is not None:
try:
df[spect_label]
top_level_columns.append(spect_label)
except:
print(
"The specified spectral data label (" + spect_label + ") "
"was not found in the data frame!"
)
print("Setting spect_label to None")
self.spect_label = None
# Check to make sure that at least one of the expected top level
# columns
# are present. If not, raise exception.
if top_level_columns == []:
raise Exception(
"ERROR: The data frame does not contain columns with the "
"labels "
+ meta_label
+ ", "
+ comp_label
+ ", or "
+ spect_label
+ "! Check your inputs and try again."
)
# Attempt to get the top level and second level column names
try:
df.columns.levels[0]
list(df.columns.levels[1].values)
# If the columns are not multiindexes, then we will *assume* that
# they are tuples that can be converted. Anything that isn't a tuple
# will be removed. An example of this would be a dataset that looks
# like this:
# |('meta','target_name') | ('wvl',125.5)
# 0 | 'Made_Up_Name' | 12345.3
except:
print(
"WARNING: The spectra dataset is not in PyHAT's native \
multi-index format. \
\nIt will be converted assuming column labels "
"are tuples suitable for converstion to multi-index. \
\nPlease check that this has been done correctly"
)
# Build list of tuples to drop
to_drop = []
# Loop through the columns
for i in range(len(df.columns)):
# Check if the ith column is a tuple
if not isinstance(df.columns[i], tuple):
# If not, add the column to the drop list
print(
"WARNING: "
+ str(df.columns[i])
+ " is not a tuple \
(this can be caused by duplicate column names). Removing "
"this column."
)
to_drop.append(df.columns[i])
# If the ith column is a tuple, check to see if the top-level
# value is in the default list of top level columns
elif df.columns[i][0] not in top_level_columns:
print(
"WARNING: You have provided data with a top-level \
column %s that does not "
"match the specified "
"top-level column: \
%s. *This data will be dropped.* You can either "
"reformat\
your dataset or change the top-level column "
"labels." % (df.columns[i][0], top_level_columns)
)
to_drop.append(df.columns[i])
# Drop the list of columns that are not tuples
df.drop(columns=to_drop, inplace=True)
# Now we can generate the multiindex columns
df.columns = pd.MultiIndex.from_tuples(list(df.columns))
new_columns = []
for col in df.columns:
col = list(col)
# Try to turn the second level column headers into floats if
# possible, which is relevant
# to wavelength values
try:
col[1] = float(col[1])
# This will generally fail for metadata categories since these are
# usually non-numeric character strings there.
except:
if col[0] == self.spect_label:
# If this fails for a wavelength column, let the user know.
# This can cause issues with PyHAT analyses
print(
"WARNING: The wavelength value " + str(col[1]) + " failed to "
"be converted to a float. The value will "
"be kept as-is,"
" but this should be addressed by the user."
)
new_columns.append(tuple(col))
# Set the columns to their formatted versions
df.columns = pd.MultiIndex.from_tuples(new_columns)
# Try to convert spectral intensities to float
if self.spect_label is not None:
try:
df[spect_label] = df[spect_label].apply(pd.to_numeric, errors="raise")
except:
print(
"WARNING: There are spectral intensities that are "
"non-numeric. These have failed float conversion and "
"could impact analysis."
)
df[spect_label] = df[spect_label].apply(pd.to_numeric, errors="ignore")
# store the df in the object
self.df = df
if self.spect_label is not None:
self.get_wvls()
else:
self.wvls = None
[docs]
def get_wvls(self):
self.wvls = self.df[self.spect_label].columns.values
[docs]
def cal_tran(self, A, B, dataAmatchcol, dataBmatchcol, params, Aname, Bname):
self.df, self.ct_obj = cal_tran.call_cal_tran(
A,
B,
self.df,
dataAmatchcol,
dataBmatchcol,
params,
spect_label=self.spect_label,
dataAname=Aname,
dataBname=Bname,
dataCname=self.name,
)
[docs]
def cal_tran_cv(self, B, dataAmatchcol, dataBmatchcol, paramgrid, Bname):
self.ct_cv_results = cal_tran_cv.call_cal_tran_cv(
self.df,
B,
dataAmatchcol,
dataBmatchcol,
paramgrid,
spect_label=self.spect_label,
dataAname=self.name,
dataBname=Bname,
)
[docs]
def cluster(self, col, method, params, kws):
self.df = cluster.cluster(self.df, col, method=method, params=params, kws=kws)
[docs]
def combine_spectral_data(self, data2):
self.df[(self.meta_label, "Dataset")] = self.name
data2.df[(self.meta_label, "Dataset")] = data2.name
new_data = SpectralData(
pd.concat([self.df, data2.df], ignore_index=True),
meta_label=self.meta_label,
spect_label=self.spect_label,
comp_label=self.comp_label,
)
return new_data
[docs]
def copy_spectral_data(self, new_name):
new_data = copy.deepcopy(self)
new_data.name = new_name
return new_data
[docs]
def deriv(self):
self.df = deriv.deriv(self.df, spect_label=self.spect_label)
[docs]
def dim_red(self, col, method, params, kws, load_fit, ycol=None):
self.df, self.do_dim_red = dim_red.dim_red(
self.df, col, method, params, kws, load_fit=load_fit, ycol=ycol
)
[docs]
def interp(self, xnew):
self.df = interp.interp(self.df, xnew, spect_label=self.spect_label)
[docs]
def shift(self, shift):
self.df = shift_spect.shift_spect(self.df, shift, spect_label=self.spect_label)
[docs]
def mask(self, maskfile, maskvar):
self.df = mask.mask(self.df, maskfile, maskvar=maskvar)
[docs]
def multiply_vector(self, vectorfile):
self.df = multiply_vector.multiply_vector(
self.df, vectorfile=vectorfile, spect_label=self.spect_label
)
[docs]
def norm(self, ranges, col_var):
self.df = norm.norm(self.df, ranges, col_var=col_var)
[docs]
def outlier_identify(self, col, method, params):
self.df = outlier_identify.outlier_identify(
self.df, col=col, method=method, params=params
)
[docs]
def endmember_identify(self, col, method, n_endmembers):
self.df, indices = endmember_identify.emi(
self.df, col=col, emi_method=method, n_endmembers=n_endmembers
)
[docs]
def peak_area(self, peaks_mins_file):
self.df, self.peaks, self.mins = peak_area.peak_area(
self.df, peaks_mins_file=peaks_mins_file, spect_label=self.spect_label
)
self.spect_label = "peak_area"
[docs]
def random_folds(self, nfolds):
self.df = folds.random(self.df, nfolds, meta_label=self.meta_label)
[docs]
def remove_baseline(self, method, segment, params):
self.df, self.df_baseline = remove_baseline.remove_baseline(
self.df,
method=method,
segment=segment,
params=params,
spect_label=self.spect_label,
)
[docs]
def stratified_folds(self, nfolds, col, tiebreaker, comp_label="comp"):
self.df = folds.stratified_folds(
self.df,
nfolds=nfolds,
sortby=(comp_label, col),
tiebreaker=(comp_label, tiebreaker),
)
[docs]
def enumerate_duplicates(self, col):
self.df = enumerate_duplicates(self.df, col=col)
[docs]
def scale(self, df_to_fit=None):
self.df, self.scaler = scale.do_scale(
self.df, df_to_fit, spect_label=self.spect_label
)
[docs]
def unmix(self, endmembers_df, method, params, normalize):
endmembers = endmembers_df.iloc[
np.squeeze(np.array((endmembers_df["endmembers"] == 1))), :
]
results = unmix.unmix(
np.array(self.df[self.spect_label]),
endmembers[self.spect_label],
method,
params=params,
normalize=normalize,
)
return results
[docs]
def lookup(self, lookupdata, left_on, right_on):
self.df = lookup(
self.df, lookupdf=lookupdata, left_on=left_on, right_on=right_on
)
[docs]
def remove_rows(self, matching_values):
self.df = remove_rows(self.df, matching_values, spect_label=self.spect_label)
[docs]
def closest_wvl(self, input_wvls):
wvls = self.df[self.spect_label].columns.values
output_wvls = []
for w in input_wvls:
idx = (np.abs(wvls - w)).argmin()
output_wvls.append(wvls[idx])
return output_wvls
[docs]
def remove_unnamed(self):
# Handle unnamed columns from the input data by removing them
colmask = self.df.columns.levels[0].str.match("Unnamed")
if np.max(colmask) > 0:
print("Removing unnamed columns:")
print(self.df.columns.levels[0][colmask])
good_data = []
for c in self.df.columns.levels[0][~colmask]:
data_tmp = self.df[c]
data_tmp.columns = pd.MultiIndex.from_tuples(
[(c, col) for col in data_tmp.columns.values]
)
good_data.append(data_tmp)
self.df = pd.concat(good_data, axis=1)
else:
pass
[docs]
def remove_duplicates(self):
try:
# remove duplicate wvl values
data_wvl = self.df[self.spect_label]
data_no_wvl = self.df.drop(columns=self.spect_label)
good_wvls = []
for i in data_wvl.columns:
try:
i = float(i)
good_wvls.append(True)
except:
print("Removing column " + str(i))
good_wvls.append(False)
data_wvl = data_wvl.iloc[:, good_wvls]
data_wvl.columns = pd.MultiIndex.from_tuples(
[(self.spect_label, float(i)) for i in data_wvl.columns]
)
self.df = pd.merge(data_no_wvl, data_wvl, left_index=True, right_index=True)
except:
pass
"""
def m3_params(self, paramname=None):
if paramname is not None:
if paramname == "R540":
m3.r540(self)
elif paramname == "R750":
m3.r750(self)
elif paramname == "R1580":
m3.r1580(self)
elif paramname == "R2780":
m3.r2780(self)
elif paramname == "VISNIR":
m3.visnir(self)
elif paramname == "R950_750":
m3.r950_750(self)
elif paramname == "2um_Ratio":
m3.twoum_ratio(self)
elif paramname == "Thermal_Ratio":
m3.thermal_ratio(self)
elif paramname == "Vis_Slope":
m3.visslope(self)
elif paramname == "1um_Slope":
m3.oneum_slope(self)
elif paramname == "2um_Slope":
m3.twoum_slope(self)
elif paramname == "BD620":
m3.bd620(self)
elif paramname == "BD950":
m3.bd950(self)
elif paramname == "BD1050":
m3.bd1050(self)
elif paramname == "BD1250":
m3.bd1250(self)
elif paramname == "BD3000":
m3.bd3000(self)
elif paramname == "BD1900":
m3.bd1900(self)
elif paramname == "BD2300":
m3.bd2300(self)
elif paramname == "BDI1000":
m3.bdi1000(self)
elif paramname == "BDI2000":
m3.bdi2000(self)
elif paramname == "OLINDEX":
m3.olindex(self)
elif paramname == "1um_min":
m3.oneum_min(self)
elif paramname == "1um_FWHM":
m3.oneum_fwhm(self)
elif paramname == "1um_symmetry":
m3.oneum_sym(self)
elif paramname == "BD1um_ratio":
m3.bd1um_ratio(self)
elif paramname == "BD2um_ratio":
m3.bd2um_ratio(self)
else:
print(paramname + " is not recognized as a M3 summary " "parameter!")
def crism_params(self, paramname=None):
if paramname is not None:
if paramname == "R440":
crism.r440(self)
elif paramname == "R530":
crism.r530(self)
elif paramname == "R600":
crism.r600(self)
elif paramname == "R770":
crism.r770(self)
elif paramname == "R1080":
crism.r1080(self)
elif paramname == "R1300":
crism.r1300(self)
elif paramname == "R1330":
crism.r1330(self)
elif paramname == "R1506":
crism.r1506(self)
elif paramname == "R2529":
crism.r2529(self)
elif paramname == "R3920":
crism.r3920(self)
elif paramname == "Red/Blue Ratio":
crism.rbr(self)
elif paramname == "BD530":
crism.bd530_2(self)
elif paramname == "BD640":
crism.bd640_2(self)
elif paramname == "BD860":
crism.bd860_2(self)
elif paramname == "BD920":
crism.bd920_2(self)
elif paramname == "BD1300":
crism.bd1300(self)
elif paramname == "BD1400":
crism.bd1400(self)
elif paramname == "BD1435":
crism.bd1435(self)
elif paramname == "BD1500":
crism.bd1500_2(self)
elif paramname == "BD1750":
crism.bd1750_2(self)
elif paramname == "BD1900":
crism.bd1900_2(self)
elif paramname == "BD1900r2":
crism.bd1900r2(self)
elif paramname == "BD2190":
crism.bd2190(self)
elif paramname == "BD2190":
crism.bd2190(self)
elif paramname == "BD2100":
crism.bd2100_2(self)
elif paramname == "BD2165":
crism.bd2165(self)
elif paramname == "BD2210":
crism.bd2210_2(self)
elif paramname == "BD2230":
crism.bd2230(self)
elif paramname == "BD2250":
crism.bd2250(self)
elif paramname == "BD2265":
crism.bd2265(self)
elif paramname == "BD2290":
crism.bd2290(self)
elif paramname == "BD2355":
crism.bd2355(self)
elif paramname == "BD2500h":
crism.bd2500h_2(self)
elif paramname == "BD2600":
crism.bd2600(self)
elif paramname == "BD3000":
crism.crism_bd3000(self)
elif paramname == "BD3100":
crism.bd3100(self)
elif paramname == "BD3200":
crism.bd3200(self)
elif paramname == "BD3400":
crism.bd3400_2(self)
elif paramname == "BDI1000VIS":
crism.bdi1000VIS(self)
elif paramname == "BDI1000IR":
crism.bdi1000IR(self)
elif paramname == "BDI2000":
crism.crism_bdi2000(self)
elif paramname == "SH600":
crism.sh600_2(self)
elif paramname == "SH770":
crism.sh770(self)
elif paramname == "SINDEX2":
crism.sindex2(self)
elif paramname == "CINDEX2":
crism.cindex2(self)
elif paramname == "RPEAK1":
crism.rpeak1(self)
elif paramname == "OLINDEX3":
crism.olivine_index3(self)
elif paramname == "LCPINDEX2":
crism.lcp_index2(self)
elif paramname == "HCPINDEX2":
crism.hcp_index2(self)
elif paramname == "ISLOPE1":
crism.islope1(self)
elif paramname == "ICER1_2":
crism.icer1_2(self)
elif paramname == "DOUB2200H":
crism.doub2200h(self)
elif paramname == "MIN2200":
crism.min2200(self)
elif paramname == "D2200":
crism.d2200(self)
elif paramname == "MIN2250":
crism.min2250(self)
elif paramname == "D2300":
crism.d2300(self)
elif paramname == "MIN2295_2480":
crism.min2295_2480(self)
elif paramname == "MIN2345_2537":
crism.min2345_2537(self)
elif paramname == "IRR1":
crism.irr1(self)
elif paramname == "IRR2":
crism.irr2(self)
elif paramname == "IRR3":
crism.irr3(self)
else:
print(paramname + " is not recognized as a CRISM summary " "parameter!")
"""
# sometimes (e.g. when loading data) we end up with spectra containing
# nans.
# This removes any spectrum with a NaN
[docs]
def remove_empty_spectra(self):
nan_mask = self.df[self.spect_label].isna().any(axis=1)
if np.max(nan_mask) is True:
print(
str(np.sum(nan_mask)) + " spectra containing NaNs identified! These "
"will be removed."
)
self.df = self.df.iloc[np.array(~nan_mask), :]