# -*- coding: utf-8 -*-
"""
Created on Tue Jan 05 07:55:56 2016
@author: Suhas Somnath, Chris Smith, Rama K. Vasudevan
"""
from __future__ import division, print_function, absolute_import, unicode_literals
from os import path
from warnings import warn
import h5py
import matplotlib.pyplot as plt
import numpy as np
import xlrd as xlreader
from sidpy.hdf.hdf_utils import write_simple_attrs, link_h5_objects_as_attrs, \
get_attr, get_auxiliary_datasets
from pyUSID.processing.comp_utils import get_available_memory, parallel_compute
from pyUSID.io.hdf_utils import find_dataset, create_indexed_group, \
write_main_dataset, get_unit_values
from pyUSID import Dimension
from pyUSID.io.anc_build_utils import create_spec_inds_from_vals
from .histogram import build_histogram
from ...analysis.utils.be_sho import SHOestimateGuess
from ...viz.be_viz_utils import plot_1d_spectrum, plot_2d_spectrogram, \
plot_histograms
nf32 = np.dtype({'names': ['super_band', 'inter_bin_band', 'sub_band'],
'formats': [np.float32, np.float32, np.float32]})
[docs]
def infer_bipolar_triangular_fraction_phase(slopes):
"""
Infers the VS cycle fraction and phase when parameters were
stored in old mat files
Parameters
--------------------
slopes : list / tuple
Array of mean slopes of each fraction of a SINGLE cycle
Returns
--------------------
tuple:
fraction : float
Fraction of VS cycle
phase : float
Phase offset for VS cycle
"""
if all([_ > 0 for _ in slopes]):
return 0.25, 0
elif all([_ < 0 for _ in slopes]):
return 0.25, 0.75
elif all([_ > 0 for _ in slopes[:2]]) and all(
[_ < 0 for _ in slopes[2:]]):
return 0.5, 0
elif all([_ < 0 for _ in slopes[:2]]) and all(
[_ > 0 for _ in slopes[2:]]):
return 0.5, 0.5
elif all([_ > 0 for _ in slopes[:1]]) and all(
[_ < 0 for _ in slopes[1:]]):
return 0.75, 0
elif all([_ > 0 for _ in slopes[:3]]) and all(
[_ < 0 for _ in slopes[3:]]):
return 0.75, 0.25
elif all([_ < 0 for _ in slopes[:1]]) and all(
[_ > 0 for _ in slopes[1:]]):
return 0.75, 0.5
elif all([_ < 0 for _ in slopes[:3]]) and all(
[_ > 0 for _ in slopes[3:]]):
return 0.75, 0.75
elif slopes[0] > 0 and slopes[1] < 0 and slopes[2] < 0 and slopes[3] > 0:
return 1, 0
elif slopes[0] < 0 and slopes[1] > 0 and slopes[2] > 0 and slopes[3] < 0:
return 1, 0.5
else:
return 0, 0
[docs]
def flat_parm_dict_to_nested(parm_dict):
"""
Creates a nested dictionary of parameters for Band EXcitation from the
standard flat dictionary
Parameters
----------
parm_dict : dict
Dictionary with parameters
Returns
-------
nest_parm_dict : dict
Nested dictionary of the same parameters
"""
keys = list(parm_dict.keys())
keys.sort()
"""
for key in keys:
print('{} : {}'.format(key, main_dsets[0].parent.parent.attrs[key]))
"""
nest_parm_dict = dict()
for key in ['FORC', 'VS', 'grid', 'BE', 'IO', 'File', 'Misc']:
nest_parm_dict[key] = dict()
for key in keys:
parts = key.split('_')
parent = 'Misc'
rem_key = key
if len(parts) > 1:
if parts[0] in nest_parm_dict.keys():
parent = parts[0]
rem_key = '_'.join(parts[1:])
nest_parm_dict[parent].update(
{rem_key: parm_dict[key]})
return nest_parm_dict
[docs]
def remove_non_exist_spec_dim_labs(h5_spec_inds, h5_spec_vals,
h5_meas_grp, verbose=False):
"""
Removes non-existent spectroscopic dimension name and units from
attributes of spectroscopic datasets.
Notes
-----
This was written mainly to clean up
after the buggy Labview HDF5 acquisition software. This will be used in
the LabviewHDF5Patcher Translator
Parameters
----------
h5_spec_inds : h5py.Dataset
Dataset containing the spectroscopic indices
h5_spec_vals : h5py.Dataset
Dataset containing the spectroscopic values
h5_meas_grp : h5py.Group
Group containing all the parameters for the BE measurement
verbose : bool, optional. Default = False
Whether or not to print statements aiding in debugging
Returns
-------
None
"""
for obj, name, exp_type in zip([h5_spec_inds, h5_spec_vals, h5_meas_grp],
['h5_spec_inds', 'h5_spec_vals', 'h5_meas_grp'],
[h5py.Dataset, h5py.Dataset, h5py.Group]):
if not isinstance(obj, exp_type):
raise TypeError('{} should be a {}. Provided object was: {}'
''.format(name, exp_type, type(obj)))
spec_dim_names = get_attr(h5_spec_inds, 'labels')
spec_dim_units = get_attr(h5_spec_inds, 'units')
if len(spec_dim_names) != len(spec_dim_units):
raise ValueError('Unqeual lengths for the spec dim names and units')
if len(spec_dim_names) <= h5_spec_inds.shape[0]:
return
if verbose:
print(
'extra dimensions in the list of names attributes: {} compared to '
'rows in dataset: {}'
''.format(len(spec_dim_names), h5_spec_inds.shape[0]))
if len(spec_dim_names) - h5_spec_inds.shape[0] > 1:
raise NotImplementedError(
'Cannot handle case when more than one dimensions are fake')
# Gather basic parameters for each dimension
# h5_meas_grp = h5_raw.parent.parent
field_type = get_attr(h5_meas_grp, 'VS_measure_in_field_loops')
num_freq_bins = int(get_attr(h5_meas_grp, 'num_bins') /
get_attr(h5_meas_grp, 'num_UDVS_steps'))
num_fields = 1 + int(all([targ in field_type for targ in ['in', 'out']]))
dc_off_steps = get_attr(h5_meas_grp, 'VS_steps_per_full_cycle')
num_cycles = get_attr(h5_meas_grp, 'VS_number_of_cycles')
try:
num_forc_cycles = get_attr(h5_meas_grp, 'VS_num_of_FORC_cycles')
except KeyError:
# This is named differently in some h5 files
num_forc_cycles = get_attr(h5_meas_grp, 'VS_num_of_Forc_cycles')
size_dict = {'Frequency': num_freq_bins,
'DC_Offset': dc_off_steps,
'Field': num_fields,
'Cycle': num_cycles,
'FORC': num_forc_cycles,
}
if verbose:
for key, val in size_dict.items():
print('{} : {}'.format(key, val))
matched_dims = list()
matched_units = list()
row_ind = 0
for dim_name, dim_units in zip(spec_dim_names, spec_dim_units):
# Pass one row at a time:
this_dict = get_unit_values(np.expand_dims(h5_spec_inds[row_ind],
axis=0),
np.expand_dims(h5_spec_vals[row_ind],
axis=0),
is_spec=True, all_dim_names=[dim_name])
std_dim_name = dim_name
if dim_name == 'FORC_Cycle':
std_dim_name = 'FORC'
if verbose:
print(dim_name, len(this_dict[dim_name]), size_dict[std_dim_name])
if len(this_dict[dim_name]) == size_dict[std_dim_name]:
row_ind += 1
matched_dims.append(dim_name)
matched_units.append(dim_units)
else:
if verbose:
print(
'\tDimension: "' + dim_name + '" did not match with what '
'was in dataset')
if verbose:
print('Writing new attributes to spectroscopic datasets')
new_attrs = {'labels': matched_dims, 'units': matched_units}
for h5_dset in [h5_spec_inds, h5_spec_vals]:
write_simple_attrs(h5_dset, new_attrs)
[docs]
def parmsToDict(filepath, parms_to_remove=[]):
"""
Translates the parameters in the text file into a dictionary.
Also indentifies whether this is a BEPS or BELine dataset.
Parameters
-----------
filepath : String / Unicode
Absolute path of the parameters text or spreadsheet file.
parms_to_remove : List of string (Optional)
keys that this function should attempt to remove from the dictionary
Returns
----------
isBEPS : Boolean
whether this dataset is BEPS or BE Line.
parm_dict : Dictionary
experimental parameters
"""
verbose = False
lines = list()
if filepath.lower().endswith('.txt'):
file_handle = open(filepath, 'r')
raw_lines = file_handle.readlines()
file_handle.close()
for line in raw_lines:
line = line.rstrip()
if len(line) > 0:
lines.append(line.split(" : "))
elif filepath.lower().endswith('.xls') or filepath.lower().endswith('.xlsx'):
workbook = xlreader.open_workbook(filepath)
worksheet = workbook.sheet_by_index(0)
for row in range(worksheet.nrows):
temp = list()
for col in range(worksheet.ncols):
try:
val = str(worksheet.cell(row, col).value).strip()
if len(val) > 0:
temp.append(val)
except ValueError:
pass
except:
raise
lines.append(temp)
else:
warn('Parameter file not of expected format: text or spreadsheet')
return None
if verbose:
print("Finished reading the file")
is_beps = False
parm_dict = dict()
prefix = 'File_'
for fields in lines:
# Ignore the parameters describing the GUI choices
if prefix == 'Multi_':
continue
# Check if the line is a group header or parameter/value pair
if len(fields) == 2:
# Get the current name/value pair, and clean up the name
name = fields[0].strip().replace('# ', 'num_').replace('#', 'num_').replace(' ', '_')
value = fields[1]
# Rename specific parameters
if name == '1_mode':
name = 'mode'
if name == 'IO_rate':
name = 'IO_rate_[Hz]'
try:
value = int(value.split()[0]) * 1E6
except ValueError:
# very rare
# let it be a string
pass
if name == 'AO_range':
name = 'AO_range_[V]'
value = ' '.join(value.split()[:-1])
if name == 'AO_amplifier':
value = value.split()[0]
if name == 'cycle_time[s]':
name = 'cycle_time_[s]'
if name == 'FORC_V_high1[V]':
name = 'FORC_V_high1_[V]'
if name == 'FORC_V_high2[V]':
name = 'FORC_V_high2_[V]'
if name == 'FORC_V_low1[V]':
name = 'FORC_V_low1_[V]'
if name == 'FORC_V_low2[V]':
name = 'FORC_V_low2_[V]'
if name == 'amplitude[V]':
name = 'amplitude_[V]'
if name == 'offset[V]':
name = 'offset_[V]'
if name == 'set_pulst_amplitude[V]':
name = 'set_pulst_amplitude_[V]'
if name == 'set_pulst_duration[s]':
name = 'set_pulst_duration_[s]'
if name == 'step_edge_smoothing[s]':
name = 'step_edge_smoothing_[s]'
# Append the prefix to the name
name = prefix + name.lstrip(prefix)
# Write parameter to parm_dict
try:
num = float(value)
parm_dict[name] = num
if num == int(num):
parm_dict[name] = int(num)
except ValueError:
parm_dict[name] = value
except:
raise
elif len(fields) == 1:
# Change the parameter prefix to the new one from the group header
prefix = fields[0].strip('<').strip('>')
prefix = prefix.split()[0] + '_'
# Check if there are VS Parameters. Set isBEPS to true if so.
is_beps = is_beps or prefix == 'VS_'
if is_beps:
useless_parms = \
['Multi_1_BE_response_spectra',
'Multi_2_BE_amplitude_spectra',
'Multi_3_BE_phase_spectra',
'Multi_4_BE_response_spectrogram',
'Multi_5_BE_amplitude_spectrogram',
'Multi_6_BE_phase_spectrogram',
'Multi_7_VS_response_loops',
'Multi_8_VS_amplitude_loops',
'Multi_9_VS_phase_loops',
'Multi_10_topography',
'Multi_11_mean_channel_2']
else:
useless_parms = \
['Multi_1_BE_response_spectra',
'Multi_2_BE_amplitude_spectra',
'Multi_3_BE_phase_spectra',
'Multi_4_BE_response_spectrogram',
'Multi_5_BE_amplitude_spectrogram',
'Multi_6_BE_phase_spectrogram',
'Multi_7_amplitude_map',
'Multi_8_resonance_map',
'Multi_9_Q_map',
'Multi_10_phase_map',
'Multi_11_topography',
'Multi_12_AI_time_domain',
'Multi_13_AI_Fourier_amplitude']
if verbose:
print("Finished parsing the text pairs. isBEPS = {}".format(is_beps))
useless_parms.extend(parms_to_remove)
# Now remove the list of useless parameters:
for uparm in useless_parms:
try:
del parm_dict[uparm]
except KeyError:
# warn('Parameter to be deleted does not exist')
pass
except:
raise
del uparm, useless_parms
if verbose:
print("Finished removing useless parameters")
if is_beps:
# fix the DC type in the parms:
if parm_dict['VS_measure_in_field_loops'] == 'out-of-field only':
parm_dict['VS_measure_in_field_loops'] = 'out-of-field'
elif parm_dict['VS_measure_in_field_loops'] == 'in-field only':
parm_dict['VS_measure_in_field_loops'] = 'in-field'
return is_beps, parm_dict
###############################################################################
[docs]
def requires_conjugate(chosen_spectra, default_q=10, cores=None):
"""
Determines whether or not the conjugate of the data needs to be taken based on the quality factor
Parameters
----------
chosen_spectra : 2D complex numpy array
N random spectra arranged as [instance, frequency]
default_q : unsigned int, Optional
Default value of Q factor that the SHO guess function results in for poor guesses
cores : uint, Optional
Number of CPU cores to use for the SHO guess.
Default = None - ask pyUSID.processing.comp_utils.recommend_cpu_cores for automatic assignment
Returns
-------
do_conjugate : Boolean
Whether or not to take the conjugate of the data
"""
# Do the SHO Guess for each of these
sho_guess = parallel_compute(chosen_spectra, SHOestimateGuess, cores=cores,
func_args=[np.arange(chosen_spectra.shape[1]),
5])
q_results = np.array(sho_guess)[:, 2]
good_q = q_results[np.where(q_results != default_q)]
if np.mean(good_q) < 0:
return True
else:
return False
###############################################################################
[docs]
def getSpectroscopicParmLabel(expt_type):
"""
Returns the label for the spectroscopic parameter in the plot group.
Parameters
----------
expt_type : str
Type of the experiment - found in the parms.txt file
Returns
-------
str
label for the spectroscopic parameter axis in the plot
"""
if expt_type in ['DC modulation mode', 'current mode']:
return 'DC Bias'
elif expt_type == 'AC modulation mode with time reversal':
return 'AC amplitude'
return 'User Defined'
[docs]
def normalizeBEresponse(spectrogram_mat, FFT_BE_wave, harmonic):
"""
This function normalizes the BE waveform to correct the phase by diving by the excitation
Parameters
------------
spectrogram_mat : 2D complex numpy array
BE response arranged as [bins, steps]
FFT_BE_wave : 1D complex numpy array
FFT of the BE waveform at the appropriate bins. Number of bins must match with spectrogram_mat
harmonic : unsigned int
nth harmonic of the excitation waveform
Returns
----------
spectrogram_mat : 2D complex numpy array
Normalized BE response spectrogram
"""
BE_wave = np.fft.ifftshift(np.fft.ifft(FFT_BE_wave))
scaling_factor = 1
if harmonic == 2:
scaling_factor = np.fft.fftshift(np.fft.fft(BE_wave ** 2)) / (2 * np.exp(1j * 3 * np.pi * 0.5))
elif harmonic == 3:
scaling_factor = np.fft.fftshift(np.fft.fft(BE_wave ** 3)) / (4 * np.exp(1j * np.pi))
elif harmonic >= 4:
print("Warning these high harmonics are not supported in translator.")
# Generate transfer functions
F_AO_spectrogram = np.transpose(np.tile(FFT_BE_wave / scaling_factor, [spectrogram_mat.shape[1], 1]))
# Divide by transfer function
spectrogram_mat = spectrogram_mat / F_AO_spectrogram
return spectrogram_mat
[docs]
def generatePlotGroups(h5_main, mean_resp, folder_path, basename, max_resp=[], min_resp=[],
max_mem_mb=1024, spec_label='None',
show_plots=True, save_plots=True, do_histogram=False,
debug=False):
"""
Generates the spatially averaged datasets for the given raw dataset.
The averaged datasets are necessary for quick visualization of the quality of data.
Parameters
----------
h5_main : H5 reference
to the main dataset
mean_resp : 1D numpy array
spatially averaged amplitude
folder_path : String
Absolute path of the data folder
basename : String
base name of the dataset
max_resp : 1D numpy array
Maximum amplitude for all pixels
min_resp : 1D numpy array
Minimum amplitude for all pixels
max_mem_mb : Unisigned integer
Maximum memory that can be used for generating histograms
spec_label : String
Parameter that is varying
show_plots : (optional) Boolean
Whether or not to show plots
save_plots : (optional) Boolean
Whether or not to save generated plots
do_histogram : Boolean (Optional. Default = False)
Whether or not to generate hisograms.
Caution - Histograms can take a fair amount of time to compute.
debug : Boolean, Optional
If True, then extra debug statements are printed.
Default False
"""
# Too
assert isinstance(h5_main, h5py.Dataset)
h5_f = h5_main.file
grp = h5_main.parent
h5_freq = grp['Bin_Frequencies']
UDVS = grp['UDVS']
spec_inds = h5_main.h5_spec_inds
UDVS_inds = grp['UDVS_Indices']
spec_vals = h5_main.h5_spec_vals
spec_dim_labs = list(get_attr(spec_inds, 'labels'))
# std_cols = ['wave_type','Frequency','DC_Offset','wave_mod','AC_Amplitude','dc_offset','ac_amplitude']
col_names = list(get_attr(UDVS, 'labels'))
if len(col_names) <= 5:
"""
No plot groups are defined in the UDVS table.
All plot group datasets will be written to the
Channel group
"""
return
# Removing the standard columns
col_names = list(get_attr(UDVS, 'labels')[5:])
# col_names = [col for col in col_names if col not in std_cols + ignore_plot_groups]
freq_inds = spec_inds[spec_dim_labs.index('Frequency')].flatten()
for col_name in col_names:
# Make sure the column name is a regular Python string
col_name = str(col_name)
ref = UDVS.regionref[:, col_names.index(col_name)]
# ref = UDVS.attrs[col_name]
# Make sure we're actually dealing with a reference of some type
if not isinstance(ref, h5py.RegionReference):
continue
# 4. Access that column of the data through region reference
steps = np.where(np.isfinite(UDVS[ref]))[0]
udvs_inds = UDVS_inds[()]
step_inds = np.array([np.where(udvs_inds == step)[0] for step in steps]).flatten()
if step_inds.dtype != np.int:
# dtype = Object
warn('step indices looked odd. Trying to fix..', UserWarning)
# Every alternate element in the array is empty.
temp = [item for item in step_inds if len(item) > 0]
step_inds = np.array(temp)
step_inds = step_inds.flatten()
"""selected_UDVS_steps = UDVS[ref]
selected_UDVS_steps = selected_UDVS_steps[np.isfinite(selected_UDVS_steps)]"""
(step_averaged_vec, mean_spec) = reshape_mean_data(spec_inds, step_inds, mean_resp)
"""
Need to account for cases with multiple excitation waveforms
This will affect the frequency indices / values
We are assuming that there is only one excitation waveform per plot group
"""
freq_slice = np.unique(freq_inds[step_inds])
freq_vec = h5_freq[()][freq_slice]
num_bins = len(freq_slice) # int(len(freq_inds)/len(UDVS[ref]))
pg_data = np.repeat(UDVS[ref], num_bins)
plot_grp = create_indexed_group(grp, 'Spatially_Averaged_Plot_Group')
write_simple_attrs(plot_grp, {'Name': col_name})
h5_mean_spec = plot_grp.create_dataset('Mean_Spectrogram',
data=mean_spec,
dtype=np.complex64)
h5_step_avg = plot_grp.create_dataset('Step_Averaged_Response',
data=step_averaged_vec,
dtype=np.complex64)
# cannot assume that this is DC offset, could be AC amplitude....
h5_spec_parm = plot_grp.create_dataset('Spectroscopic_Parameter',
data=np.squeeze(pg_data[step_inds]),
dtype=np.uint32)
write_simple_attrs(h5_spec_parm, {'name': spec_label})
h5_freq_vec = plot_grp.create_dataset('Bin_Frequencies',
data=freq_vec,
dtype=h5_freq.dtype)
# Linking the datasets with the frequency and the spectroscopic variable:
link_h5_objects_as_attrs(h5_mean_spec, [h5_spec_parm, h5_freq_vec])
link_h5_objects_as_attrs(h5_step_avg, [h5_freq_vec])
"""
Create Region Reference for the plot group in the Raw_Data, Spectroscopic_Indices
and Spectroscopic_Values Datasets
"""
raw_ref = h5_main.regionref[:, step_inds]
spec_inds_ref = spec_inds.regionref[:, step_inds]
spec_vals_ref = spec_vals.regionref[:, step_inds]
ref_name = col_name.replace(' ', '_').replace('-', '_') + '_Plot_Group'
h5_main.attrs[ref_name] = raw_ref
spec_inds.attrs[ref_name] = spec_inds_ref
spec_vals.attrs[ref_name] = spec_vals_ref
h5_f.flush()
if do_histogram:
"""
Build the histograms for the current plot group
"""
hist = BEHistogram()
hist_mat, hist_labels, hist_indices, hist_indices_labels = \
hist.buildPlotGroupHist(h5_main, step_inds, max_response=max_resp,
min_response=min_resp, max_mem_mb=max_mem_mb, debug=debug)
hist_grp = create_indexed_group(plot_grp, 'Histogram')
hist_spec_dims = list()
hist_units = ['V', '', 'V', 'V']
for hist_ind, hist_dim in enumerate(hist_labels):
hist_spec_dims.append(Dimension(hist_dim,
hist_units[hist_ind],
hist_indices[hist_ind]))
h5_hist = write_main_dataset(hist_grp, hist_mat, 'Histograms',
'Counts', 'a.u.',
None, hist_spec_dims,
h5_pos_inds=h5_main.h5_pos_inds, h5_pos_vals=h5_main.h5_pos_vals,
dtype=np.int32,
chunking=(1, hist_mat.shape[1]),
compression='gzip')
else:
"""
Write the min and max response vectors so that histograms can be generated later.
"""
h5_max_resp = plot_grp.create_dataset('Max_Response',
data=max_resp)
h5_min_resp = plot_grp.create_dataset('Min_Response',
data=min_resp)
if save_plots or show_plots:
fig_title = '_'.join(grp.name[1:].split('/') + [col_name])
fig_1d, axes_1d = plot_1d_spectrum(step_averaged_vec, freq_vec, fig_title)
if save_plots:
path_1d = path.join(folder_path, basename + '_Step_Avg_' + fig_title + '.png')
path_2d = path.join(folder_path, basename + '_Mean_Spec_' + fig_title + '.png')
path_hist = path.join(folder_path, basename + '_Histograms_' + fig_title + '.png')
fig_1d.savefig(path_1d, format='png', dpi=300)
if mean_spec.shape[0] > 1:
fig_2d, axes_2d = plot_2d_spectrogram(mean_spec, freq_vec, title=fig_title)
if save_plots:
fig_2d.savefig(path_2d, format='png', dpi=300)
if do_histogram:
plot_histograms(hist_mat, hist_indices, grp.name, figure_path=path_hist)
if show_plots:
plt.show()
else:
plt.close('all')
# print('Generated spatially average data for group: %s' %(col_name))
print('Completed generating spatially averaged plot groups')
###############################################################################
[docs]
def reshape_mean_data(spec_inds, step_inds, mean_resp):
"""
Takes in the mean data vector and rearranges that data according to
plot group as [step number,bins]
Parameters
-----------
spec_inds : 2D numpy array
UDVS_Indices as a 2D mat
step_inds : 1D numpy array or list
UDVS step indices corresponding to this plot group
mean_resp : 1D numpy complex array
position averaged BE data
Returns
----------
step_averaged_vec : 1D complex numpy array
Mean (position averaged) spectrogram averaged over the UDVS steps as well
mean_spectrogram : 2D complex numpy array
Position averaged data arranged as [step number,bins]
"""
num_bins = len(np.unique(spec_inds[0, step_inds]))
# Stephen says that we can assume that the number of bins will NOT change in a plot group
mean_spectrogram = mean_resp[step_inds].reshape(-1, num_bins)
step_averaged_vec = np.mean(mean_spectrogram, axis=0)
return step_averaged_vec, mean_spectrogram
###############################################################################
[docs]
def visualize_plot_groups(h5_filepath):
"""
Visualizes the plot groups present in the provided BE data file
Parameters
----------
h5_filepath : String / Uniciode
Absolute path of the h5 file
"""
with h5py.File(h5_filepath, mode='r') as h5f:
expt_type = h5f.attrs.get('data_type')
if expt_type not in ['BEPSData', 'BELineData']:
warn('Invalid data format')
return
for grp_name in h5f.keys():
grp = h5f[grp_name]['Channel_000']
for plt_grp_name in grp.keys():
if plt_grp_name.startswith('Spatially_Averaged_Plot_Group_'):
plt_grp = grp[plt_grp_name]
if expt_type == 'BEPSData':
spect_data = plt_grp['Mean_Spectrogram'].value
_ = plot_2d_spectrogram(spect_data, plt_grp['Bin_Frequencies'].value,
title=plt_grp.attrs['Name'])
step_avg_data = plt_grp['Step_Averaged_Response']
_ = plot_1d_spectrum(step_avg_data, plt_grp['Bin_Frequencies'].value, plt_grp.attrs['Name'])
try:
hist_data = plt_grp['Histograms']
hist_bins = plt_grp['Histograms_Indicies']
plot_histograms(hist_data, hist_bins, plt_grp.attrs['Name'])
except:
pass
plt.show()
plt.close('all')
###############################################################################
[docs]
def trimUDVS(udvs_mat, udvs_labs, udvs_units, target_col_names):
"""
Removes unused (typically default) plot groups
Parameters
----------
udvs_mat : 2D numpy array
UDVS table arranged as [steps, col]
udvs_labs : list of strings
Column names of the UDVS table
udvs_units : list of strings
Units for the columns of the UDVS table
target_col_names : list of strings
Column names that need to be removed
Returns
-----------
udvs_mat : 2D numpy array
Truncated UDVS table
udvs_labs : list of strings
Truncated list of UDVS column names
udvs_units : list of strings
Truncated list of UDVS column units
"""
# Delete UDVS rows which are zeros in the very end:
# Look at the first column - UDVS steps
step = np.where(np.diff(udvs_mat[:, 0]) < 0)[0]
if len(step) > 0:
warn('Found zeros at the bottom of the UDVS table. Only keeping the '
'first {} rows'.format(step[0]))
udvs_mat = udvs_mat[:step[0] + 1]
if len(target_col_names) == 0:
return udvs_mat, udvs_labs, udvs_units
if len(udvs_labs) != udvs_mat.shape[1]:
warn('Error: Incompatible UDVS matrix and labels. Not truncating!')
return udvs_mat, udvs_labs, udvs_units
# First figure out the column indices
col_inds = []
found_cols = []
for target in target_col_names:
for ind, udvs_col in enumerate(udvs_labs):
if udvs_col == target:
col_inds.append(ind)
found_cols.append(udvs_col)
break
col_inds = np.sort(np.unique(col_inds))
# Now remove from the labels and the matrix
udvs_mat = np.delete(udvs_mat, col_inds, axis=1)
# col_inds.sort(reverse=True)
[udvs_units.pop(ind) for ind in range(len(col_inds), 0, -1)]
udvs_labs = [col for col in udvs_labs if col not in found_cols]
return udvs_mat, udvs_labs, udvs_units
[docs]
def createSpecVals(udvs_mat, spec_inds, bin_freqs, bin_wfm_type, parm_dict,
udvs_labs, udvs_units, verbose=False):
"""
This function will determine the proper Spectroscopic Value array for the
dataset
Chris Smith -- csmith55@utk.edu
Parameters
----------
udvs_mat : numpy array
UDVS table from dataset
spec_inds : numpy array
Spectroscopic Indices table from dataset
bin_freqs : numpy array
Bin frequencies
bin_wfm_type : numpy array
waveform type for each frequency index
parm_dict : dictionary
parameters for dataset
udvs_labs : list of strings
labels for the columns of the UDVS matrix
udvs_units : list of strings
units for the columns of the UDVS matrix
verbose : bool, optional. Default = False
Whether or not to print debugging print statements
Returns
-----------
ds_spec_val_mat : numpy array
Spectroscopic Values table
ds_spec_val_labs : list of strings
names of the columns of the Spectroscopic Values table
ds_spec_val_units : list of strings
units of the columns of the Spectroscopic Values table
ds_spec_val_labs_names : list of Strings
labels with the names of their parameters
"""
def __FindSpecValIndices(udvs_mat, spec_inds, usr_defined=False,
verbose=False):
"""
This function finds the Spectroscopic Values associated with the dataset that
have more than one unique value
Parameters
----------
udvs_mat : numpy array containing the UDVS table
spec_inds : numpy array contain Spectroscopic indices table
Returns
-------
iSpec_var : integer array holding column indices in UDVS that change
ds_spec_val_mat : array holding all spectral values for columns in iSpec_var
"""
udvs_cols = ['step_num', 'dc_offset', 'ac_ampli', 'wave_type',
'wave_mod', 'in-field', 'out-of-field']
if verbose:
print('\t' * 3 + '__FindSpecValIndices:')
print('\t' * 4 + 'UDVS matrix of shape: {}'.format(udvs_mat.shape))
print('\t' * 4 + 'spec_inds of shape: {}'.format(spec_inds.shape))
if False:
print('\t'.join(udvs_cols))
for ud_row in udvs_mat:
print('\t\t'.join(['{:04.2f}'.format(item) for item in ud_row]))
if False: # Turn this off if necessary
fig, axes = plt.subplots(nrows=2, figsize=(10, 5))
for ind, axis in enumerate(axes.flat):
axis.plot(spec_inds[ind, :])
axis.set_title('spec_inds[{}]'.format(ind))
fig.tight_layout()
"""
icheck is an array containing all UDVS steps which should be checked.
"""
icheck = np.unique(spec_inds[1])
if verbose and False:
print('\t' * 4 + 'UDVS steps that will be checked: {}'.format(icheck))
if len(icheck) < 1:
raise ValueError('No row in the spectroscopic indices varied.\n'
'Cannot build spectroscopic datasets further')
# Copy even step values of DC_offset into odd steps
UDVS = np.copy(udvs_mat)
if not usr_defined:
DC = UDVS[:, 1]
for step in range(0, DC.size, 2):
DC[step + 1] = DC[step]
UDVS[:, 1] = DC
"""
Keep only the UDVS values for steps which we care about and the
first 5 columns
"""
UDVS = UDVS[icheck, :5]
udvs_cols = udvs_cols[:5]
# UDVS = np.array([UDVS[i] for i in icheck])
if verbose and False:
print('\t' * 4 + 'UDVS matrix after down-selecting rows: {}'
''.format(UDVS.shape))
print('\t'.join(udvs_cols))
for ud_row in UDVS:
print('\t\t'.join(['{:04.2f}'.format(item) for item in ud_row]))
"""
Transpose UDVS for ease of looping later on and store the number of steps
as num_cols
"""
num_cols = np.size(UDVS, 1)
"""
Initialize the iSpec_var as an empty array. It will store the index of the
UDVS label for any column which has more than one unique value
"""
iSpec_var = []
"""
Loop over all columns in udvs_mat
"""
for col_ind, col_name in zip(range(1, num_cols), udvs_cols[1:]):
"""
Find all unique values in the current column
"""
toosmall = np.where(abs(UDVS[:, col_ind]) < 1E-5)[0]
UDVS[toosmall, col_ind] = 0
uvals = np.unique(UDVS[:, col_ind])
"""
np.unique considers all NaNs to be unique values
These two lines find the indices of all NaNs in the unique value array
and removes all but the first
"""
nanvals = np.where(np.isnan(uvals))[0]
if verbose:
print('\t\t\tColumn {}: {} has {} unique values and {} NaNs'
''.format(col_ind, col_name, len(uvals), len(nanvals)))
uvals = np.delete(uvals, nanvals[1:])
"""
Check if more that one unique value
Append column number to iSpec_var if true
"""
if verbose:
print('\t\t\tColumn {}: {} had {} actually unique values'
''.format(col_ind, col_name, len(uvals)))
if uvals.size > 1:
iSpec_var = np.append(iSpec_var, int(col_ind))
if verbose:
print('\t' * 4 + 'UDVS matrix of shape: {}'.format(UDVS.shape))
print('\t' * 4 + 'Taking columns specified by iSpec_var: {}'
''.format(iSpec_var))
if len(iSpec_var) < 1:
warn('No variables were varied in UDVS table! Using UDVS step as '
'the only variable', UserWarning)
return np.asarray(1, np.int), \
np.expand_dims(np.arange(UDVS.shape[0], dtype=np.int), 1)
iSpec_var = np.asarray(iSpec_var, np.int)
ds_spec_val_mat = UDVS[:, iSpec_var]
return iSpec_var, ds_spec_val_mat
def __BEPSVals(udvs_mat, spec_inds, bin_freqs, bin_wfm_type, parm_dict,
udvs_labs, udvs_units, verbose=False):
"""
Returns the Spectroscopic Value array for a BEPS dataset
Parameters
----------
udvs_mat : hdf5 dataset reference to UDVS dataset
spec_inds : numpy array containing Spectroscopic indices table
bin_freqs : 1D numpy array of frequencies
bin_wfm_type : numpy array containing the waveform type for each
frequency index
parm_dict : parameter dictinary for dataset
udvs_labs : list of labels for the columns of the UDVS matrix
udvs_units : list of units for the columns of the UDVS matrix
Returns
-------
ds_spec_val_mat : list holding final Spectroscopic Value table
"""
"""
Check the mode to determine if DC, AC, or something else
"""
mode = parm_dict['VS_mode']
if mode == 'DC modulation mode' or mode == 'current mode':
"""
First we call the FindSpecVals function to get the columns in UDVS
of interest and return a first pass on the spectral value array
"""
if verbose:
print('\t' * 3 + 'DC modulation mode / current mode')
iSpecVals, inSpecVals = __FindSpecValIndices(udvs_mat, spec_inds,
verbose=verbose)
if verbose:
print('\t' * 3 + 'Generated spec vals. Calling __BEPSDC')
return __BEPSDC(udvs_mat, inSpecVals, bin_freqs, bin_wfm_type,
parm_dict, verbose=verbose)
elif mode == 'AC modulation mode with time reversal':
"""
First we call the FindSpecVals function to get the columns in UDVS
of interest and return a first pass on the spectral value array
"""
if verbose:
print('\t' * 3 + 'AC modulation')
iSpecVals, inSpecVals = __FindSpecValIndices(udvs_mat, spec_inds,
verbose=verbose)
if verbose:
print('\t' * 3 + 'Generated spec vals. Calling __BEPSAC')
return __BEPSAC(udvs_mat, inSpecVals, bin_freqs, bin_wfm_type,
parm_dict, verbose=verbose)
else:
"""
First we call the FindSpecVals function to get the columns in UDVS
of interest and return a first pass on the spectral value array
"""
if verbose:
print('\t' * 3 + 'user defined voltage spectroscopy')
iSpecVals, inSpecVals = __FindSpecValIndices(udvs_mat, spec_inds,
usr_defined=True,
verbose=verbose)
if verbose:
print('\t' * 3 + 'Generated spec vals. Calling __BEPSgen')
return __BEPSgen(udvs_mat, inSpecVals, bin_freqs, bin_wfm_type,
udvs_labs, iSpecVals, udvs_units)
def __BEPSDC(udvs_mat, inSpecVals, bin_freqs, bin_wfm_type, parm_dict,
verbose=False):
"""
Calculates Spectroscopic Values for BEPS data in DC modulation mode
Parameters
----------
udvs_mat : hdf5 dataset reference to UDVS dataset
inSpecVals : list holding initial guess at spectral values
bin_freqs : 1D numpy array of frequencies
bin_wfm_type : numpy array containing the waveform type for each frequency index
parm_dict : parameter dictinary for dataset
verbose: bool, optional. Default = False
Whether or not to print statements for debugging
Returns
-------
ds_spec_val_mat : list holding final Spectroscopic Value table
SpecValsLabels : list holding labels of column names of ds_spec_val_mat
"""
hascycles = False
hasFORCS = False
# print('in shape:',np.shape(inSpecVals))
"""
All DC datasets will need Spectroscopic Value fields for Bin, DC, and Field
Bin - Bin number
DC - dc offset from UDVS
Field - 0 if in-field, 1 if out-of-field
"""
nrow = 2
ds_spec_val_labs = ['Frequency', 'DC_Offset']
ds_spec_val_units = ['Hz', 'V']
"""
Get the number of and steps in the current dataset
"""
numsteps = udvs_mat.shape[0]
"""
Get the wave form for each step from udvs_mat
"""
wave_form = udvs_mat[:, 3]
"""
Define list of attribute names needed from the group metadata
"""
field_type = parm_dict['VS_measure_in_field_loops']
numcycles = parm_dict['VS_number_of_cycles']
numFORCs = parm_dict['FORC_num_of_FORC_cycles']
numcyclesteps = parm_dict['VS_steps_per_full_cycle']
cycle_fraction = parm_dict['VS_cycle_fraction']
if field_type == 'in and out-of-field':
ds_spec_val_labs.append('Field')
ds_spec_val_units.append('')
nrow += 1
frac = {'full': 1.0, '1/2': 0.5, '1/4': 0.25, '3/4': 0.75}
numcyclesteps = frac[cycle_fraction] * numcyclesteps
"""
Check the number of cycles and FORCs
Add to Spectroscopic Values Labels as needed
"""
if numcycles > 1:
hascycles = True
nrow += 1
ds_spec_val_labs.append('Cycle')
ds_spec_val_units.append('')
if numFORCs > 1:
hasFORCS = True
"""
It's possible to have 1 cycle with multiple FORCs so force cycle tracking
if FROCs exist
"""
nrow += 1
ds_spec_val_labs.append('FORC')
ds_spec_val_units.append('')
numFORCsteps = numcycles * numcyclesteps * 2
"""
Check the field type
For in-field and out-of-field we know all values of field ahead of time
For in and out-of-field we must check at each step
If something else is in field_type, we default to in and out-of-field and print message
"""
if field_type == 'out-of-field':
field = 1
numsteps = int(numsteps / 2)
numcyclesteps = int(numcyclesteps / 2)
swapfield = [1, 1]
field_names = ['out-of-field']
elif field_type == 'in-field':
field = 0
numsteps = int(numsteps / 2)
numcyclesteps = int(numcyclesteps / 2)
swapfield = [0, 0]
field_names = ['in-field']
elif field_type == 'in and out-of-field':
field = 0
swapfield = [1, 0]
field_names = ['out-of-field', 'in-field']
else:
warn('{} is not a known field type'.format(field_type))
field = 0
swapfield = [1, 0]
"""
Initialize ds_spec_val_mat so that we can append to it in loop
"""
ds_spec_val_mat_2 = list()
"""
Main loop over all steps
"""
FORC = -1
cycle = -1
if verbose:
print('\t' * 4 + 'field_type: {}, hascycles: {}, hasFORCS: {}'
''.format(field_type, hascycles, hasFORCS))
print('\t' * 4 + 'Looping over {} steps'.format(numsteps))
# TODO: Make this horribly slow double for loop much faster!
for step in range(numsteps):
"""
Calculate the cycle number if needed
"""
if hasFORCS:
FORC = np.floor(step / numFORCsteps)
stepinFORC = step - FORC * numFORCsteps
cycle = np.floor(stepinFORC / numcyclesteps / 2)
elif hascycles:
cycle = np.floor(step / numcyclesteps / 2)
"""
Change field if needed
"""
field = swapfield[field]
"""
Get bins for current step based on waveform
"""
this_wave = np.where(bin_wfm_type == wave_form[step])[0]
if len(this_wave) < 1:
raise ValueError('Waveform type: {} not found in bin_wfm_type:'
' {}'.format(wave_form[step], bin_wfm_type))
# print('\t' * 6 + '{}'.format(inSpecVals[step]))
suffix = [inSpecVals[step][0]]
if field_type == 'in and out-of-field':
suffix.append(field)
if hascycles:
suffix.append(cycle)
if hasFORCS:
suffix.append(FORC)
if verbose and step == 0:
print('\t' * 5 + 'Step: {} of {}: Suffix: {}'
''.format(step, numsteps, suffix))
print('\t' * 5 + 'this_wave of shape: {}'
''.format(this_wave.shape))
"""
Loop over bins
"""
# TODO: Vectorize this loop at a minimum
for thisbin in this_wave:
col_val = [bin_freqs[thisbin]]
"""
Add entries for field, cycle and/or FORC as needed
"""
# TODO: Why not add these later as columns instead of per row?
col_val += suffix
ds_spec_val_mat_2.append(col_val)
if verbose and step == 0:
print('\t' * 5 + 'At step {} ds_spec_val_mat_2: ({}, {})'
''.format(step, len(ds_spec_val_mat_2),
len(ds_spec_val_mat_2[0])))
ds_spec_val_mat_2 = np.array(ds_spec_val_mat_2).T
if verbose:
print('\t' * 4 + 'Shape of spec val mats: {}'
''.format(ds_spec_val_mat_2.shape))
return ds_spec_val_mat_2, ds_spec_val_labs, ds_spec_val_units, [['Field', field_names]]
def __BEPSAC(udvs_mat, inSpecVals, bin_freqs, bin_wfm_type, parm_dict,
verbose=False):
"""
Calculates Spectroscopic Values for BEPS data in AC modulation mode with time
reversal
Parameters
----------
udvs_mat : hdf5 dataset reference to UDVS dataset
inSpecVals : list holding initial guess at spectral values
bin_freqs : 1D numpy array of frequencies
bin_wfm_type : numpy array containing the waveform type for each frequency index
parm_dict : parameter dictinary for dataset
verbose: bool, optional. Default = False
Whether or not to print statements for debugging
Returns
-------
ds_spec_val_mat : list holding final Spectroscopic Value table
SpecValsLabels : list holding labels of column names of ds_spec_val_mat
"""
hascycles = False
hasFORCS = False
"""
All AC datasets will need Spectroscopic Value fields for Bin, AC, and Direction
Bin - Bin number
AC - AC amplitude from UDVS
forrev - 1 if forward, -1 if reverse
"""
nrow = 3
ds_spec_val_labs = ['Frequency', 'AC_Amplitude', 'Direction']
ds_spec_val_units = ['Hz', 'A', '']
"""
Get the number of bins and steps in the current dataset
"""
numsteps = np.shape(udvs_mat)[0]
"""
Get the wave form for each step from udvs_mat
"""
wave_form = udvs_mat[:, 3]
"""
Define list of attribute names needed from the group metadata
"""
numcycles = parm_dict['VS_number_of_cycles']
numFORCs = parm_dict['FORC_num_of_FORC_cycles']
numcyclesteps = parm_dict['VS_steps_per_full_cycle']
cycle_fraction = parm_dict['VS_cycle_fraction']
frac = {'full': 1.0, '1/2': 0.5, '1/4': 0.25, '3/4': 0.75}
numcyclesteps = frac[cycle_fraction] * numcyclesteps
"""
Check the number of cycles and FORCs
Add to Spectroscopic Values Labels as needed
"""
if numcycles > 1:
hascycles = True
nrow += 1
ds_spec_val_labs.append('Cycle')
ds_spec_val_units.append('')
if numFORCs > 1:
hasFORCS = True
nrow += 1
ds_spec_val_labs.append('FORC')
ds_spec_val_units.append('')
numFORCsteps = numcycles * numcyclesteps
"""
Initialize ds_spec_val_mat so that we can append to it in loop
"""
ds_spec_val_mat_2 = list()
"""
Main loop over all steps
"""
FORC = -1
cycle = -1
warn('Generating spectroscopic values for AC spectroscopy. This can '
'take a while. Please be patient')
for step in range(numsteps):
if verbose and False:
print('\t' * 4 + 'Working on step: {} of {}'.format(step,
numsteps))
"""
Calculate the cycle number if needed
"""
if hasFORCS:
FORC = np.floor(step / numFORCsteps)
stepinFORC = step - FORC * numFORCsteps
cycle = np.floor(stepinFORC / numcyclesteps)
elif hascycles:
cycle = np.floor(step / numcyclesteps)
"""
Check the wave_mod
"""
wmod = inSpecVals[step][1]
forrev = np.sign(wmod)
"""
Get bins for current step based on waveform
"""
this_wave = np.where(bin_wfm_type == wave_form[step])[0]
suffix = [inSpecVals[step][0], forrev]
if hascycles:
suffix.append(cycle)
if hasFORCS:
suffix.append(FORC)
if verbose and step == 0:
print('\t' * 5 + 'Step: {}: Suffix: {}'.format(step, suffix))
print('\t' * 5 + 'this_wave of shape: {}'.format(
this_wave.shape))
"""
Loop over bins
"""
# TODO: Consider vectorizing here
for thisbin in this_wave:
col_val = [bin_freqs[thisbin]]
"""
Add entries to cycle and/or FORC as needed
"""
col_val += suffix
ds_spec_val_mat_2.append(col_val)
ds_spec_val_mat_2 = np.array(ds_spec_val_mat_2).T
if verbose:
print('\t' * 4 + 'Shape of spec val mats: {}'
''.format(ds_spec_val_mat_2.shape))
return ds_spec_val_mat_2, ds_spec_val_labs, ds_spec_val_units, [['Direction', ['reverse', 'forward']]]
def __BEPSgen(udvs_mat, inSpecVals, bin_freqs, bin_wfm_type, udvs_labs, iSpecVals, udvs_units):
"""
Calculates Spectroscopic Values for BEPS data in generic mode
Parameters
----------
udvs_mat : hdf5 dataset reference to UDVS dataset
inSpecVals : list holding initial guess at spectral values
bin_freqs : 1D numpy array of frequencies
bin_wfm_type : numpy array containing the waveform type for each frequency index
udvs_labs : list of labels for the columns of the UDVS matrix
Returns
-------
ds_spec_val_mat -- list holding final Spectroscopic Value table
SpecValsLabels -- list holding labels of column names of ds_spec_val_mat
"""
"""
Get the number of bins and steps in the current dataset
"""
numsteps = udvs_mat.shape[0]
"""
Get the wave form for each step from udvs_mat
"""
wave_form = udvs_mat[:, 3]
"""
All datasets will need Spectroscopic Value fields for Bin,
everything else must be defined
Bin - Bin number
"""
ds_spec_val_labs = ['Frequency']
ds_spec_val_units = ['Hz']
ds_spec_val_labs.extend(udvs_labs[(iSpecVals[:])])
ds_spec_val_units.extend([udvs_units[i] for i in iSpecVals])
nrow = len(ds_spec_val_labs)
"""
Initialize ds_spec_val_mat so that we can append to it in loop
"""
ds_spec_val_mat = np.empty([nrow, 1])
"""
Main loop over all steps
"""
for step in range(numsteps):
"""
Get the wave form for each step from udvs_mat
"""
this_wave = np.where(bin_wfm_type == wave_form[step])[0]
for thisbin in this_wave:
colVal = np.array([[bin_freqs[thisbin]]])
colVal = np.append(colVal, [[row] for row in inSpecVals[step, :]], axis=0)
ds_spec_val_mat = np.append(ds_spec_val_mat, colVal, axis=1)
return ds_spec_val_mat[:, 1:], ds_spec_val_labs, ds_spec_val_units, []
"""
********************************************************************************************
END OF INTERNAL FUNCTION LIST
"""
dtype = parm_dict['data_type']
if dtype == 'BELineData':
if verbose:
print('\t\tWorking on BE Line data')
ds_spec_val_mat = bin_freqs.reshape([1, -1])
ds_spec_inds_mat = np.zeros_like(ds_spec_val_mat, dtype=np.int32)
ds_spec_val_labs = ['Frequency']
ds_spec_val_units = ['Hz']
spec_vals_labs_names = []
ds_spec_inds_mat[0, :] = np.arange(ds_spec_val_mat.shape[1])
elif dtype == 'BEPSData':
if verbose:
print('\t\tWorking on BEPS data. Calling __BEPSVals')
# Call BEPSVals to finish the refining of the Spectroscopic Value array
ret_vals = __BEPSVals(udvs_mat, spec_inds, bin_freqs, bin_wfm_type,
parm_dict, udvs_labs, udvs_units,
verbose=verbose)
if verbose:
print('\t\tGenerated values from BEPSVals internal function')
ds_spec_val_mat, ds_spec_val_labs, ds_spec_val_units, spec_vals_labs_names = ret_vals
mode = parm_dict['VS_mode']
# TODO: This is a very slow step - vectorize?
if ds_spec_val_mat.shape[1] > 500E+3:
warn('Please be patient while spectroscopic indices are being '
'generated')
ds_spec_inds_mat = create_spec_inds_from_vals(ds_spec_val_mat)
if verbose:
print('\t\tReturned from create_spec_inds_from_vals')
# Make sure that the frequencies reset properly for user defined case
spec_start = 0
if mode == 'load user defined VS Wave from file':
if verbose:
print('\t\tMaking sure that the frequencies reset properly '
'for user defined voltage spectroscopy case')
wave_form = udvs_mat[:, 3]
for wave in wave_form:
wave_freqs = bin_freqs[np.argwhere(bin_wfm_type == wave)].squeeze()
num_bins = wave_freqs.size
ds_spec_inds_mat[0, spec_start:spec_start + num_bins] = range(num_bins)
spec_start += num_bins
else:
warn('Unknown format! Cannot generate Spectroscopic Values!')
ds_spec_val_mat = []
ds_spec_inds_mat = []
ds_spec_val_labs = []
ds_spec_val_units = []
spec_vals_labs_names = []
return ds_spec_val_mat, ds_spec_inds_mat, ds_spec_val_labs, ds_spec_val_units, spec_vals_labs_names
"""
BEHistogram Class and Functions
"""
[docs]
class BEHistogram:
# TODO: Make into Process class
"""
Class just functions as a container so we can have shared objects
Chris Smith -- csmith55@utk.edu
"""
def __init__(self):
self.max_mem = None
self.max_response = None
self.min_response = None
self.num_udvs_steps = 1
self.N_spectral_steps = 1
self.N_bins = 1
self.N_freqs = 1
self.N_pixels = 1
self.N_y_bins = 1
[docs]
def addBEHist(self, h5_path, max_mem_mb=1024, show_plot=True, save_plot=True):
"""
This function adds Histgrams from the Main Data to the Plot Groups for
an existing hdf5 BEPS datafile.
Parameters
----------
h5_path : string
the path to the hdf5 datafile
max_mem_mb : unsigned integer
the maximum amount of memory to use during the binning
show_plot : Boolean
Should plot of the histograms be drawn after they are
created
save_plot : Boolean
Should plots of the histograms be saved
Returns
-------
None
"""
hdf = h5py.File(h5_path, mode='r+')
h5_file = hdf.file
print('Adding Histograms to file {}'.format(h5_file.name))
print('Path to HDF5 file is {}'.format(hdf.path))
max_mem = min(max_mem_mb * 1024 ** 2, 0.75 * get_available_memory())
h5_main = find_dataset(h5_file, 'Raw_Data')
h5_udvs = find_dataset(h5_file, 'UDVS')
m_groups = [data.parent for data in h5_main]
print('{} Measurement groups found.'.format(len(m_groups)))
for im, group in enumerate(m_groups):
p_groups = []
mspecs = find_dataset(group, 'Mean_Spectrogram')
p_groups.extend([mspec.parent for mspec in mspecs])
print('{} Plot groups in {}'.format(len(p_groups), group.name))
for ip, p_group in enumerate(p_groups):
try:
max_resp = find_dataset(group, 'Max_Response')
min_resp = find_dataset(group, 'Min_Response')
except:
warn('Maximum and Minimum Response vectors not found for {}.'.format(p_group.name))
max_resp = []
min_resp = []
print('Creating BEHistogram for Plot Group {}'.format(p_group.name))
udvs_lab = p_group.attrs['Name']
udvs_col = h5_udvs[im][h5_udvs[im].attrs[udvs_lab]]
actual_udvs_steps = np.where(np.isnan(udvs_col) is False)[0]
"""
Add the BEHistogram for the current plot group
"""
plot_grp = group.create_group(p_group.name.split('/')[-1])
write_simple_attrs(plot_grp, {'Name': udvs_lab})
hist = BEHistogram()
hist_mat, hist_labels, hist_indices, hist_indices_labels = \
hist.buildPlotGroupHist(h5_main[im],
actual_udvs_steps,
max_response=max_resp,
min_response=min_resp,
max_mem_mb=max_mem)
ds_hist = plot_grp.create_dataset('Histograms',
data=hist_mat,
dtype=np.int32)
write_simple_attrs(ds_hist, {'labels': hist_labels})
ds_hist_indices = plot_grp.create_dataset('Histograms_Indices',
data=hist_indices,
dtype=np.int32)
write_simple_attrs(ds_hist_indices,
{'labels': hist_indices_labels})
ds_hist_labels = plot_grp.create_dataset('Histograms_Labels',
data=hist_labels)
if show_plot or save_plot:
if save_plot:
basename, junk = path.splitext(h5_path)
plotfile = '{}_MG{}_PG{}_Histograms.png'.format(basename, im, ip)
plot_histograms(hist_mat, hist_indices, p_group, plotfile)
if show_plot:
plt.show()
hdf.close()
[docs]
def buildBEHist(self, h5_main, max_response=[], min_response=[], max_mem_mb=1024, max_bins=256, debug=False):
"""
Creates Histograms from dataset
Parameters
----------
h5_main : hdf5.Dataset
max_response : list
min_response : list
max_mem_mb : int
max_bins : int
debug : bool
Returns
-------
"""
free_mem = get_available_memory()
if debug:
print('We have {} bytes of memory available'.format(free_mem))
self.max_mem = min(max_mem_mb * 1024 ** 2, 0.75 * free_mem)
"""
Check that max_response and min_response have been defined.
Call __getminmaxresponse__ is not
"""
if max_response == [] or min_response == []:
max_response = np.max(np.abs(h5_main), axis=1)
min_response = np.min(np.abs(h5_main), axis=1)
self.max_response = np.mean(max_response) + 3 * np.std(max_response)
self.min_response = np.max([0, np.mean(min_response) - 3 * np.std(min_response)])
"""
Loop over all datasets
"""
active_udvs_steps = getActiveUDVSsteps(h5_main) # technically needs to be done only once
self.num_udvs_steps = len(active_udvs_steps)
"""
Load auxilary datasets and extract needed parameters
"""
spec_ind_mat = get_auxiliary_datasets(h5_main, aux_dset_name=['Spectroscopic_Indices'])[0]
self.N_spectral_steps = np.shape(spec_ind_mat)[0]
"""
Set up frequency axis of histogram, same for all histograms in a single dataset
"""
freqs_mat = get_auxiliary_datasets(h5_main, aux_dset_name=['Bin_Frequencies'])[0]
x_hist = np.array(spec_ind_mat)
self.N_bins = np.size(freqs_mat)
self.N_freqs = np.size(np.unique(freqs_mat))
# print('There are {} total frequencies in this dataset'.format(self.N_bins))
del freqs_mat, spec_ind_mat
self.N_pixels = np.shape(h5_main)[1]
# print('There are {} pixels in this dataset'.format(self.N_pixels))
self.N_y_bins = np.int(np.min((max_bins, np.rint(np.sqrt(self.N_pixels * self.N_spectral_steps)))))
# self.N_y_bins = np.min( (max_bins, np.rint(2*(self.N_pixels*self.N_spectral_steps)**(1.0/3.0))))
# print('{} bins will be used'.format(self.N_y_bins))
ds_hist = self.__datasetHist(h5_main, active_udvs_steps, x_hist, debug)
return ds_hist
[docs]
def buildPlotGroupHist(self, h5_main, active_spec_steps, max_response=[],
min_response=[], max_mem_mb=1024, max_bins=256,
std_mult=3, debug=False):
"""
Creates Histograms for a given plot group
Parameters
----------
h5_main : HDF5 Dataset object
Dataset to be historammed
active_spec_steps : numpy array
active spectral steps in the current plot group
max_response : numpy array
maximum amplitude at each pixel
min_response : numpy array
minimum amplitude at each pixel
max_mem_mb : Unsigned integer
maximum number of Mb allowed for use. Used to calculate the
number of pixels to load in a chunk
max_bins : integer
maximum number of spectroscopic bins
std_mult : integer
number of standard deviations from the mean of
max_response and min_response to include in
binning
debug : boolean
Turns on debug printing statements if true. Default False.
Returns
-------
hist_mat : 2d numpy array
4 histograms as 1d arrays
hist_labels : list of strings
names for the 4 rows in hist_mat
hist_indices : 2d numpy array
the frequency and spectroscopic bins of each column in hist_mat
hist_index_labels : list of strings
labels for the hist_indices array
"""
free_mem = get_available_memory()
if debug:
print('We have {} bytes of memory available'.format(free_mem))
self.max_mem = min(max_mem_mb, 0.75 * free_mem)
"""
Check that max_response and min_response have been defined.
Call __getminmaxresponse__ is not
"""
if max_response == [] or min_response == []:
max_response = np.amax(np.abs(h5_main), axis=0)
min_response = np.amin(np.abs(h5_main), axis=0)
self.max_response = np.mean(max_response) + std_mult * np.std(max_response)
self.min_response = np.mean(min_response) - std_mult * np.std(min_response)
del max_response, min_response
"""
Load auxilary datasets and extract needed parameters
"""
step_ind_mat = get_auxiliary_datasets(h5_main, aux_dset_name=['UDVS_Indices'])[0].value
spec_ind_mat = get_auxiliary_datasets(h5_main, aux_dset_name=['Spectroscopic_Indices'])[0].value
self.N_spectral_steps = np.size(step_ind_mat)
active_udvs_steps = np.unique(step_ind_mat[active_spec_steps])
self.num_udvs_steps = len(active_udvs_steps)
"""
Set up frequency axis of histogram, same for all histograms in a single dataset
"""
freqs_mat = get_auxiliary_datasets(h5_main, aux_dset_name=['Bin_Frequencies'])[0]
x_hist = np.array([spec_ind_mat[0], step_ind_mat], dtype=np.int32)
self.N_bins = np.size(freqs_mat)
self.N_freqs = np.size(np.unique(freqs_mat))
del freqs_mat, step_ind_mat, spec_ind_mat
self.N_pixels = np.shape(h5_main)[0]
# self.N_y_bins = np.int(np.min( (max_bins, np.rint(np.sqrt(self.N_pixels*self.N_spectral_steps)))))
self.N_y_bins = np.int(np.min((max_bins, np.rint(2 * (self.N_pixels * self.N_spectral_steps) ** (1.0 / 3.0)))))
ds_hist = self.__datasetHist(h5_main, active_udvs_steps, x_hist, debug)
if debug:
print(np.shape(ds_hist))
if debug:
print('ds_hist max', np.max(ds_hist), 'ds_hist min', np.min(ds_hist))
hist_mat, hist_labels, hist_indices, hist_index_labels = self.__reshapeHist(ds_hist)
return hist_mat, hist_labels, hist_indices, hist_index_labels
@staticmethod
def __reshapeHist(ds_hist):
"""
Reshape the histogram matrix into table, and build the associated index table
Parameters
----------
ds_hist : numpy array
the 4 histogram matrices
Returns
-------
hist_mat : 2d numpy array
the 4 histograms as 1d arrays
hist_labels : list of strings
names for the 4 rows in hist_mat
hist_indices : 2d numpy array
the frequency and spectroscopic bins of each column in hist_mat
hist_index_labels : list of strings
labels for the hist_indices array
"""
hist_shape = ds_hist.shape
hist_mat = np.reshape(ds_hist, (hist_shape[0], hist_shape[1] * hist_shape[2]))
hist_labels = ['Amplitude', 'Phase', 'Real Part', 'Imaginary Part']
hist_indices = np.zeros((2, hist_mat.shape[1]), dtype=np.int32)
hist_index_labels = ['Frequency Bin', 'Spectroscopic Bin']
for isbin in range(hist_shape[1]):
for ifbin in range(hist_shape[2]):
ihbin = ifbin + isbin * hist_shape[2]
hist_indices[0, ihbin] = ifbin
hist_indices[1, ihbin] = isbin
return hist_mat, hist_labels, hist_indices, hist_index_labels
def __datasetHist(self, h5_main, active_udvs_steps, x_hist, debug=False):
"""
Create the histogram for a single dataset
Parameters
----------
h5_main : HDF5 Dataset
Main_Dataset to be histogramed
active_udvs_steps : numpy array
the active udvs steps in the current plot group
x_hist : 1d numpy array
the spectroscopic indices matrix, used to find the
spectroscopic indices of each udvs step
Returns
-------
ds_hist : numpy array
the 4 histogram matrices
"""
"""
Estimate maximum number of pixels to read at once
"""
max_pixels = maxReadPixels(self.max_mem, self.N_pixels, self.num_udvs_steps,
bytes_per_bin=h5_main.dtype.itemsize * self.N_y_bins * self.N_freqs)
"""
Divide the pixels into chunks that will fit in memory
"""
pix_chunks = np.append(np.arange(0, self.N_pixels, max_pixels, dtype=np.int), self.N_pixels)
"""
Initialize the histograms
"""
ds_hist = np.zeros((4, self.N_freqs, self.N_y_bins), dtype=np.int32)
"""
loop over pixels
"""
for ichunk in range(len(pix_chunks) - 1):
if debug:
print('pixel chunk', ichunk)
chunk = range(pix_chunks[ichunk], pix_chunks[ichunk + 1])
"""
Loop over active UDVS steps
"""
for iudvs in range(self.num_udvs_steps):
selected = (iudvs + chunk[0] * self.num_udvs_steps) % np.rint(
self.num_udvs_steps * self.N_pixels / 10) == 0
if selected:
per_done = np.rint(
100 * (iudvs + chunk[0] * self.num_udvs_steps) / (self.num_udvs_steps * self.N_pixels))
print('Binning BEHistogram...{}% --pixels {}-{}, step # {}'.format(per_done, chunk[0],
chunk[-1], iudvs))
udvs_step = active_udvs_steps[iudvs]
if debug:
print('udvs step', udvs_step)
"""
Get the correct Spectroscopic bins for the current UDVS step
Read desired pixel chunk from these bins for Main_Data into data_mat
"""
udvs_bins = np.where(x_hist[1] == udvs_step)[0]
if debug:
print(np.shape(x_hist))
data_mat = h5_main[pix_chunks[ichunk]:pix_chunks[ichunk + 1], udvs_bins]
"""
Get the frequecies that correspond to the current UDVS bins from the total x_hist
"""
this_x_hist = np.take(x_hist[0], udvs_bins)
this_x_hist = this_x_hist - this_x_hist[0]
this_x_hist = np.transpose(np.tile(this_x_hist, (1, pix_chunks[ichunk + 1] - pix_chunks[ichunk])))
this_x_hist = np.squeeze(this_x_hist)
N_x_bins = np.shape(this_x_hist)[0]
if debug:
print('N_x_bins', N_x_bins)
print(this_x_hist)
print(np.shape(this_x_hist))
"""
Create weighting vector. If setting all to one value, can be a scalar.
"""
weighting_vec = 1
if debug:
print(np.shape(data_mat))
"""
Set up the list of functions to call and their corresponding maxima and minima
"""
func_list = [np.abs, np.angle, np.real, np.imag]
max_list = [self.max_response, np.pi, self.max_response, self.max_response]
min_list = [self.min_response, -np.pi, self.min_response, self.min_response]
"""
Get the Histograms and store in correct place in ds_hist
"""
for ifunc, func in enumerate(func_list):
chunk_hist = build_histogram(this_x_hist,
data_mat,
N_x_bins,
self.N_y_bins,
weighting_vec,
min_list[ifunc],
max_list[ifunc],
func,
debug)
if debug:
print('chunkhist-{}'.format(func.__name__), np.shape(chunk_hist))
print(chunk_hist.dtype)
for (i, ifreq) in enumerate(udvs_bins):
ids_freq = this_x_hist[i]
if debug:
print(i, ifreq)
print(ids_freq)
ds_hist[ifunc, ids_freq, :] = np.add(ds_hist[ifunc, ids_freq, :], chunk_hist[i, :])
return ds_hist
[docs]
def maxReadPixels(max_memory, tot_pix, bins_per_step, bytes_per_bin=4):
"""
Calculates the maximum number of pixels that can be loaded into the
specified memory size. This is particularly useful when applying a
(typically parallel) operation / processing on each pixel.
Example - Fitting response to a model.
Parameters
-------------
max_memory : unsigned int
Maximum memory (in bytes) that can be used.
For example 4 GB would be = 4*((2**10)**3) bytes
tot_pix : unsigned int
Total number of pixels in dataset
bins_per_step : unsigned int
Number of bins that will be read (can be portion of each pixel)
bytes_per_bin : (Optional) unsigned int
size of each bin - set to 4 bytes
Returns
-------
max_pix : unsigned int
Maximum number of pixels that will be loaded
"""
# alternatively try .nbytes
bytes_per_step = bins_per_step * bytes_per_bin
max_pix = np.rint(max_memory / bytes_per_step)
# print('Allowed to read {} of {} pixels'.format(max_pix,tot_pix))
max_pix = max(1, min(tot_pix, max_pix))
return np.uint(max_pix)
[docs]
def getActiveUDVSsteps(h5_raw):
"""
Returns all the active UDVS steps in the data
Parameters
----------
h5_raw : HDF5 dataset reference
Reference to the raw data
Returns
-----------
steps : 1D numpy array
Active UDVS steps
"""
udvs_step_vec = get_auxiliary_datasets(h5_raw, aux_dset_name=['UDVS_Indices'])[0].value
return np.unique(udvs_step_vec)
[docs]
def getSliceForExcWfm(h5_bin_wfm, excit_wfm):
"""
Returns the indices that correspond to the given excitation waveform
that can be used to slice the bin datasets
* Developer note - Replace the first parameter with the Raw_Data dataset
Parameters
----------------
h5_bin_wfm : Reference to HDF5 dataset
Bin Waveform Indices
excit_wfm : integer
excitation waveform / wave type
Returns
--------------
slc : slice object
Slice with the start and end indices
"""
temp = np.where(h5_bin_wfm.value == excit_wfm)[0]
return slice(temp[0], temp[-1] + 1) # Need to add one additional index otherwise, the last index will be lost
[docs]
def getDataIndicesForUDVSstep(h5_udvs_inds, udvs_step_index):
"""
Returns the spectroscopic indices that correspond to the given udvs_step_index
that can be used to slice the main data matrix.
* Developer note - Replace the first parameter with the Raw_Data dataset
Parameters
-------------
h5_udvs_inds : Reference to HDF5 dataset
UDVS_Indices dataset
udvs_step_index : usigned int
UDVS step index (base 0)
Returns
--------------
ans : 1D numpy array
Spectroscopic indices
"""
spec_ind_udvs_step_col = h5_udvs_inds[h5_udvs_inds.attrs.get('UDVS_Step')]
return np.where(spec_ind_udvs_step_col == udvs_step_index)[0]
[docs]
def getSpecSliceForUDVSstep(h5_udvs_inds, udvs_step_index):
"""
Returns the spectroscopic indices that correspond to the given udvs_step_index
that can be used to slice the main data matrix
* Developer note - Replace the first parameter with the Raw_Data dataset
Parameters
-------------
h5_udvs_inds : Reference to HDF5 dataset
UDVS_Indices dataset
udvs_step_index : unsigned int
UDVS step index (base 0)
Returns
----------
slc : slice object
Object containing the start and end indices
"""
temp = np.where(h5_udvs_inds.value == udvs_step_index)[0]
return slice(temp[0], temp[-1] + 1) # Need to add one additional index otherwise, the last index will be lost
[docs]
def getForExcitWfm(h5_main, h5_other, wave_type):
"""
Slices the provided H5 dataset by the provided wave type.
Note that this is applicable to only certain H5 datasets such as the bin frequences, bin FFT etc.
Parameters
----------
h5_main : Reference to HDF5 dataset
Raw_Data dataset
h5_other :Reference to HDF5 dataset
The dataset that needs to be sliced such as bin frequencies
wave_type : unsigned int
Excitation waveform type
Returns
---------
freq_vec : 1D numpy array
data specific to specified excitation waveform
"""
h5_bin_wfm_type = get_auxiliary_datasets(h5_main, aux_dset_name=['Bin_Wfm_Type'])[0]
inds = np.where(h5_bin_wfm_type.value == wave_type)[0]
return h5_other[slice(inds[0], inds[-1] + 1)]
[docs]
def getIndicesforPlotGroup(h5_udvs_inds, ds_udvs, plt_grp_name):
"""
For a provided plot group name in the udvs table, this function
returns the corresponding spectroscopic indices that can be used to index / slice the main data set
and the data within the udvs table for the requested plot group
* Developer note - Replace the first parameter with the Raw_Data dataset
Parameters
------------
h5_udvs_inds : Reference to HDF5 dataset
containing the UDVS indices
ds_udvs : Reference to HDF5 dataset
containing the UDVS table
plt_grp_name : string
name of the plot group in the UDVS table
Returns
-----------
step_bin_indices : 2D numpy array
Indices arranged as [step, bin] in the spectroscopic_indices table
This is useful for knowing the number of bins and steps in this plot group.
We are allowed to assume that the number of bins does NOT change within the plot group
oneD_indices : 1D numpy array
spectroscopic indices corresponding to the requested plot group
udvs_plt_grp_col : 1D numpy array
data contained within the udvs table for the requested plot group
"""
# working on the UDVS table first:
# getting the numpy array corresponding the requested plot group
udvs_col_data = np.squeeze(ds_udvs[ds_udvs.attrs.get(plt_grp_name)])
# All UDVS steps that are NOT part of the plot grop are empty cells in the table
# and hence assume a nan value.
# getting the udvs step indices that belong to this plot group:
step_inds = np.where(np.isnan(udvs_col_data) is False)[0]
# Getting the values in that plot group that were non NAN
udvs_plt_grp_col = udvs_col_data[step_inds]
# ---------------------------------
# Now we use the udvs step indices calculated above to get
# the indices in the spectroscopic indices table
spec_ind_udvs_step_col = h5_udvs_inds[h5_udvs_inds.attrs.get('UDVS_Step')]
num_bins = len(np.where(spec_ind_udvs_step_col == step_inds[0])[0])
# Stepehen says that we can assume that the number of bins will NOT change in a plot group
step_bin_indices = np.zeros(shape=(len(step_inds), num_bins), dtype=int)
for indx, step in enumerate(step_inds):
step_bin_indices[indx, :] = np.where(spec_ind_udvs_step_col == step)[0]
oneD_indices = step_bin_indices.reshape((step_bin_indices.shape[0] * step_bin_indices.shape[1]))
return step_bin_indices, oneD_indices, udvs_plt_grp_col
[docs]
def generateTestSpectroscopicData(num_bins=7, num_steps=3, num_pos=4):
"""
Generates a (preferably small) test data set using the given parameters.
Data is filled with indices (with base 1 for simplicity).
Use this for testing reshape operations etc.
Parameters
----------
num_bins : unsigned int (Optional. Default = 7)
Number of bins
num_steps : unsigned int (Optional. Default = 3)
Number of spectroscopic steps
num_pos : unsigned int (Optional. Default = 4)
Number of fictional positions
Returns
--------------
full_data : 2D numpy array
Data organized as [steps x bins, positions]
"""
full_data = np.zeros((num_steps * num_bins, num_pos))
for pos in range(num_pos):
bin_count = 0
for step in range(num_steps):
for bind in range(num_bins):
full_data[bin_count, pos] = (pos + 1) * 100 + (step + 1) * 10 + (bind + 1)
bin_count += 1
return full_data
[docs]
def isSimpleDataset(h5_main, isBEPS=True):
"""
This function figures out if a single number defines the bins for all UDVS steps
In such cases (udvs_steps x bins, pos) can be reshaped to (bins, positions x steps)
for (theoretically) faster computation, especially for large datasets
Actually, things are a lot simpler. Only need to check if number of bins for all excitation waveforms are equal
Parameters
-------------
h5_main : Reference to HDF5 dataset
Raw_Data dataset
isBEPS : Boolean (default = True)
Whether or not this dataset is BEPS
Returns
----------
data_type : Boolean
Whether or not this dataset can be unraveled / flattened
"""
if isBEPS:
beps_modes = ['DC modulation mode', 'AC modulation mode with time reversal', 'current mode', 'Relaxation']
if h5_main.parent.parent.attrs['VS_mode'] in beps_modes:
# I am pretty sure that AC modulation also is simple
return True
else:
# Could be user defined or some other kind I am not aware of
# In many cases, some of these datasets could also potentially be simple datasets
ds_udvs = get_auxiliary_datasets(h5_main, aux_dset_name=['UDVS'])[0]
excit_wfms = ds_udvs[ds_udvs.attrs.get('wave_mod')]
wfm_types = np.unique(excit_wfms)
if len(wfm_types) == 1:
# BEPS with single excitation waveform
print('Single BEPS excitation waveform')
return True
else:
# Multiple waveform types here
harm_types = np.unique(np.abs(wfm_types))
if len(harm_types) != 1:
# eg - excitaiton waveforms 1, 2, 3 NOT -1, +1
return False
# In this case a single excitation waveform with forward and reverse was used.
h5_bin_wfm_type = get_auxiliary_datasets(h5_main, aux_dset_name=['Bin_Wfm_Type'])[0]
# Now for each wfm type, count number of bins.
wfm_bin_count = []
for wfm in wfm_types:
wfm_bin_count.append(np.where(h5_bin_wfm_type.value == wfm)[0])
wfm_lengths = np.unique(np.array(wfm_bin_count))
if len(wfm_lengths) == 1:
# BEPS with multiple excitation waveforms but each excitation waveform has same number of bins
print('All BEPS excitation waves have same number of bins')
return True
return False
else:
# BE-Line
return True