# -*- coding: utf-8 -*-
"""
:class:`~pycroscopy.analysis.be_sho_fitter.BESHOfitter` that fits band
excitation cantilever vibration response to a Simple Harmonic Oscillator model
Created on Thu Nov 20 11:48:53 2019
@author: Suhas Somnath, Chris R. Smith
"""
from __future__ import division, print_function, absolute_import, \
unicode_literals
from enum import Enum
from warnings import warn
import numpy as np
from functools import partial
from sidpy.hdf.hdf_utils import write_simple_attrs, get_attr
from pyUSID.io.hdf_utils import create_results_group, write_main_dataset, \
write_reduced_anc_dsets, create_empty_dataset
from pyUSID.io.usi_data import USIDataset
from scipy.signal import find_peaks_cwt
from .utils.be_sho import SHOestimateGuess, SHOfunc
from .fitter import Fitter
'''
Custom dtype for the datasets created during fitting.
'''
_field_names = ['Amplitude [V]', 'Frequency [Hz]', 'Quality Factor',
'Phase [rad]', 'R2 Criterion']
sho32 = np.dtype({'names': _field_names,
'formats': [np.float32 for name in _field_names]})
[docs]
class SHOGuessFunc(Enum):
complex_gaussian = 0
wavelet_peaks = 1
[docs]
class SHOFitFunc(Enum):
least_squares = 0
[docs]
class BESHOfitter(Fitter):
[docs]
def __init__(self, h5_main, **kwargs):
"""
Creates an instance of the BESHOFitter class
Parameters
----------
h5_main : pyUSID.io.USIDataset
Main dataset containing band excitation measurement
h5_target_group : h5py.Group, optional. Default = None
Location where to look for existing results and to place newly
computed results. Use this kwarg if the results need to be written
to a different HDF5 file. By default, this value is set to the
parent group containing `h5_main`
kwargs : dict, optional
Keyword arguments such as "verbose" and "cores" that will be
passed onto :class:`~pyUSID.processing.process.Process`
"""
super(BESHOfitter, self).__init__(h5_main, "SHO_Fit",
variables=['Frequency'], **kwargs)
self.parms_dict = None
self._fit_dim_name = 'Frequency'
# Extract some basic parameters that are necessary for either the guess
# or fit
freq_dim_ind = self.h5_main.spec_dim_labels.index('Frequency')
self.step_start_inds = np.where(self.h5_main.h5_spec_inds[freq_dim_ind] == 0)[0]
self.num_udvs_steps = len(self.step_start_inds)
# find the frequency vector and hold in memory
self.freq_vec = None
self._get_frequency_vector()
# This is almost always True but think of this more as a sanity check.
self.is_reshapable = _is_reshapable(self.h5_main, self.step_start_inds)
# accounting for memory copies
self._max_raw_pos_per_read = self._max_pos_per_read
# set limits in the set up functions
self.results_pix_byte_size = sho32.itemsize * self.num_udvs_steps
def _get_frequency_vector(self):
"""
Reads the frequency vector from the Spectroscopic_Values dataset.
This assumes that the data is reshape-able.
"""
h5_spec_vals = self.h5_main.h5_spec_vals
freq_dim = np.argwhere('Frequency' == np.array(self.h5_main.spec_dim_labels)).squeeze()
if len(self.step_start_inds) == 1: # BE-Line
end_ind = h5_spec_vals.shape[1]
else: # BEPS
end_ind = self.step_start_inds[1]
self.freq_vec = h5_spec_vals[freq_dim, self.step_start_inds[0]:end_ind]
def _create_guess_datasets(self):
"""
Creates the h5 group, guess dataset, corresponding spectroscopic datasets and also
links the guess dataset to the spectroscopic datasets.
"""
self.h5_results_grp = create_results_group(self.h5_main,
self.process_name,
h5_parent_group=self._h5_target_group)
write_simple_attrs(self.h5_results_grp, self.parms_dict)
# If writing to a new HDF5 file:
# Add back the data_type attribute - still being used in the visualizer
if self.h5_results_grp.file != self.h5_main.file:
write_simple_attrs(self.h5_results_grp.file,
{'data_type': get_attr(self.h5_main.file,
'data_type')})
ret_vals = write_reduced_anc_dsets(self.h5_results_grp,
self.h5_main.h5_spec_inds,
self.h5_main.h5_spec_vals,
self._fit_dim_name,
verbose=self.verbose)
h5_sho_inds, h5_sho_vals = ret_vals
self._h5_guess = write_main_dataset(self.h5_results_grp,
(self.h5_main.shape[0],
self.num_udvs_steps),
'Guess', 'SHO', 'compound',
None, None,
h5_pos_inds=self.h5_main.h5_pos_inds,
h5_pos_vals=self.h5_main.h5_pos_vals,
h5_spec_inds=h5_sho_inds,
h5_spec_vals=h5_sho_vals,
chunks=(1, self.num_udvs_steps),
dtype=sho32,
main_dset_attrs=self.parms_dict,
verbose=self.verbose)
# Does not make sense to propagate region refs - nobody uses them
# copy_region_refs(self.h5_main, self._h5_guess)
self._h5_guess.file.flush()
if self.verbose and self.mpi_rank == 0:
print('Finished creating Guess dataset')
def _create_fit_datasets(self):
"""
Creates the HDF5 fit dataset. pycroscopy requires that the h5 group, guess dataset,
corresponding spectroscopic and position datasets be created and populated at this point.
This function will create the HDF5 dataset for the fit and link it to same ancillary datasets as the guess.
The fit dataset will NOT be populated here but will instead be populated using the __setData function
"""
if self._h5_guess is None or self.h5_results_grp is None:
warn('Need to guess before fitting!')
return
"""
Once the guess is complete, the last_pixel attribute will be set to complete for the group.
Once the fit is initiated, during the creation of the status dataset, this last_pixel
attribute will be used and it wil make the fit look like it was already complete. Which is not the case.
This is a problem of doing two processes within the same group.
Until all legacy is removed, we will simply reset the last_pixel attribute.
"""
self.h5_results_grp.attrs['last_pixel'] = 0
write_simple_attrs(self.h5_results_grp, self.parms_dict)
# Create the fit dataset as an empty dataset of the same size and dtype
# as the guess.
# Also automatically links in the ancillary datasets.
self._h5_fit = USIDataset(create_empty_dataset(self._h5_guess,
dtype=sho32,
dset_name='Fit'))
self._h5_fit.file.flush()
if self.verbose and self.mpi_rank == 0:
print('Finished creating Fit dataset')
def _read_data_chunk(self):
"""
Returns the next chunk of data for the guess or the fit
"""
# The Fitter class should take care of all the basic reading
super(BESHOfitter, self)._read_data_chunk()
# At this point the self.data object is the raw data that needs to be
# reshaped to a single UDVS step:
if self.data is not None:
if self.verbose and self.mpi_rank == 0:
print('Got raw data of shape {} from super'
'.'.format(self.data.shape))
self.data = _reshape_to_one_step(self.data, self.num_udvs_steps)
if self.verbose and self.mpi_rank == 0:
print('Reshaped raw data to shape {}'.format(self.data.shape))
def _read_guess_chunk(self):
"""
Returns a chunk of guess dataset corresponding to the main dataset.
Parameters
-----
None
Returns
--------
"""
# The Fitter class should take care of all the basic reading
super(BESHOfitter, self)._read_guess_chunk()
self._guess = _reshape_to_one_step(self._guess, self.num_udvs_steps)
# bear in mind that this self._guess is a compound dataset. Convert to float32
# don't keep the R^2.
self._guess = np.hstack([self._guess[name] for name in self._guess.dtype.names if name != 'R2 Criterion'])
def _write_results_chunk(self):
"""
Writes the provided chunk of data into the guess or fit datasets.
This method is responsible for any and all book-keeping.
"""
prefix = 'guess' if self._is_guess else 'fit'
self._results = self._reformat_results(self._results,
self.parms_dict[prefix + '-algorithm'])
if self._is_guess:
self._guess = np.hstack(tuple(self._results))
# prepare to reshape:
self._guess = np.transpose(np.atleast_2d(self._guess))
if self.verbose and self.mpi_rank == 0:
print('Prepared guess of shape {} before reshaping'.format(self._guess.shape))
self._guess = _reshape_to_n_steps(self._guess, self.num_udvs_steps)
if self.verbose and self.mpi_rank == 0:
print('Reshaped guess to shape {}'.format(self._guess.shape))
else:
self._fit = self._results
self._fit = np.transpose(np.atleast_2d(self._fit))
self._fit = _reshape_to_n_steps(self._fit, self.num_udvs_steps)
# ask super to take care of the rest, which is a standardized operation
super(BESHOfitter, self)._write_results_chunk()
[docs]
def set_up_guess(self, guess_func=SHOGuessFunc.complex_gaussian,
h5_partial_guess=None, *func_args, **func_kwargs):
"""
Need this because during the set up, we won't know which strategy is being used.
Should Guess be its own Process class in that case? If so, it would end up having
its own group etc.
Parameters
-----
guess_func : SHOGuessFunc, optional
Which guess method to use. Default is complex gaussian
h5_partial_guess : h5py.Dataset, optional
Partial guess results dataset to continue computing on
"""
self.parms_dict = {'guess-method': "pycroscopy BESHO"}
if not isinstance(guess_func, SHOGuessFunc):
raise TypeError('Please supply SHOGuessFunc.complex_gaussian or SHOGuessFunc.wavelet_peaks for the guess_func')
partial_func = None
if guess_func == SHOGuessFunc.complex_gaussian:
num_points=func_kwargs.pop('num_points', 5)
self.parms_dict.update({'guess-algorithm': 'complex_gaussian',
'guess-complex_gaussian-num_points': num_points})
partial_func = partial(complex_gaussian, w_vec=self.freq_vec,
num_points=num_points)
elif guess_func == SHOGuessFunc.wavelet_peaks:
peak_width_bounds = func_kwargs.pop('peak_width_bounds', [10, 200])
peak_width_step = func_kwargs.pop('peak_width_step', 20)
if len(func_args) > 0:
# Assume that the first argument is what we are looking for
peak_width_bounds = func_args[0]
self.parms_dict.update({'guess_algorithm': 'wavelet_peaks',
'guess-wavelet_peaks-peak_width_bounds': peak_width_bounds,
'guess-wavelet_peaks-peak_width_step': peak_width_step})
partial_func = partial(wavelet_peaks, peak_width_bounds=peak_width_bounds,
peak_width_step=peak_width_step, **func_kwargs)
self._map_function = partial_func
self._max_pos_per_read = self._max_raw_pos_per_read // 1.5
# ask super to take care of the rest, which is a standardized operation
super(BESHOfitter, self).set_up_guess(h5_partial_guess=h5_partial_guess)
[docs]
def set_up_fit(self, fit_func=SHOFitFunc.least_squares,
*func_args, h5_partial_fit=None, h5_guess=None, **func_kwargs):
"""
Need this because during the set up, we won't know which strategy is being used.
Should Guess be its own Process class in that case? If so, it would end up having
its own group etc.
"""
self.parms_dict = {'fit-method': "pycroscopy BESHO"}
if not isinstance(fit_func, SHOFitFunc):
raise TypeError('Please supply SHOFitFunc.least_squares for the fit_func')
if fit_func == SHOFitFunc.least_squares:
self.parms_dict.update({'fit-algorithm': 'least_squares'})
self._max_pos_per_read = self._max_raw_pos_per_read // 1.75
# ask super to take care of the rest, which is a standardized operation
super(BESHOfitter, self).set_up_fit(h5_partial_fit=h5_partial_fit,
h5_guess=h5_guess)
def _unit_compute_fit(self):
"""
Punts unit computation on a chunk of data to Process
"""
super(BESHOfitter, self)._unit_compute_fit(_sho_error,
obj_func_args=[self.freq_vec],
solver_options={'jac': 'cs'})
def _reformat_results(self, results, strategy='wavelet_peaks'):
"""
Model specific calculation and or reformatting of the raw guess or fit results
Parameters
----------
results : array-like
Results to be formatted for writing
strategy : str
The strategy used in the fit. Determines how the results will be reformatted.
Default 'wavelet_peaks'
Returns
-------
sho_vec : numpy.ndarray
The reformatted array of parameters.
"""
if self.verbose and self.mpi_rank == 0:
print('Strategy to use for reformatting results: "{}"'.format(strategy))
# Create an empty array to store the guess parameters
sho_vec = np.zeros(shape=(len(results)), dtype=sho32)
if self.verbose and self.mpi_rank == 0:
print('Raw results and compound SHO vector of shape {}'.format(len(results)))
# Extracting and reshaping the remaining parameters for SHO
if strategy in ['wavelet_peaks', 'relative_maximum', 'absolute_maximum']:
if self.verbose and self.mpi_rank == 0:
print('Reformatting results from a peak-position-finding algorithm')
# wavelet_peaks sometimes finds 0, 1, 2, or more peaks. Need to handle that:
# peak_inds = np.array([pixel[0] for pixel in results])
peak_inds = np.zeros(shape=(len(results)), dtype=np.uint32)
for pix_ind, pixel in enumerate(results):
if len(pixel) == 1: # majority of cases - one peak found
peak_inds[pix_ind] = pixel[0]
elif len(pixel) == 0: # no peak found
peak_inds[pix_ind] = int(0.5*self.data.shape[1]) # set to center of band
else: # more than one peak found
dist = np.abs(np.array(pixel) - int(0.5*self.data.shape[1]))
peak_inds[pix_ind] = pixel[np.argmin(dist)] # set to peak closest to center of band
if self.verbose and self.mpi_rank == 0:
print('Peak positions of shape {}'.format(peak_inds.shape))
# First get the value (from the raw data) at these positions:
comp_vals = np.array(
[self.data[pixel_ind, peak_inds[pixel_ind]] for pixel_ind in np.arange(peak_inds.size)])
if self.verbose and self.mpi_rank == 0:
print('Complex values at peak positions of shape {}'.format(comp_vals.shape))
sho_vec['Amplitude [V]'] = np.abs(comp_vals) # Amplitude
sho_vec['Phase [rad]'] = np.angle(comp_vals) # Phase in radians
sho_vec['Frequency [Hz]'] = self.freq_vec[peak_inds] # Frequency
sho_vec['Quality Factor'] = np.ones_like(comp_vals) * 10 # Quality factor
# Add something here for the R^2
sho_vec['R2 Criterion'] = np.array([self.r_square(self.data, self._sho_func, self.freq_vec, sho_parms)
for sho_parms in sho_vec])
elif strategy in ['complex_gaussian']:
if self.verbose and self.mpi_rank == 0:
print('Reformatting results from the SHO Guess algorithm')
for iresult, result in enumerate(results):
sho_vec['Amplitude [V]'][iresult] = result[0]
sho_vec['Frequency [Hz]'][iresult] = result[1]
sho_vec['Quality Factor'][iresult] = result[2]
sho_vec['Phase [rad]'][iresult] = result[3]
sho_vec['R2 Criterion'][iresult] = result[4]
elif strategy in ['least_squares']:
if self.verbose and self.mpi_rank == 0:
print('Reformatting results from a list of least_squares result objects')
for iresult, result in enumerate(results):
sho_vec['Amplitude [V]'][iresult] = result.x[0]
sho_vec['Frequency [Hz]'][iresult] = result.x[1]
sho_vec['Quality Factor'][iresult] = result.x[2]
sho_vec['Phase [rad]'][iresult] = result.x[3]
sho_vec['R2 Criterion'][iresult] = 1-result.fun
else:
if self.verbose and self.mpi_rank == 0:
print('_reformat_results() will not reformat results since the provided algorithm: {} does not match anything that this function can handle.'.format(strategy))
return sho_vec
def _reshape_to_one_step(raw_mat, num_steps):
"""
Reshapes provided data from (pos, step * bin) to (pos * step, bin).
This is useful when unraveling data for parallel processing.
Parameters
-------------
raw_mat : 2D numpy array
Data organized as (positions, step * bins)
num_steps : unsigned int
Number of spectroscopic steps per pixel (eg - UDVS steps)
Returns
--------------
two_d : 2D numpy array
Data rearranged as (positions * step, bin)
"""
num_pos = raw_mat.shape[0]
num_bins = int(raw_mat.shape[1] / num_steps)
one_d = raw_mat
one_d = one_d.reshape((num_bins * num_steps * num_pos))
two_d = one_d.reshape((num_steps * num_pos, num_bins))
return two_d
def _reshape_to_n_steps(raw_mat, num_steps):
"""
Reshapes provided data from (positions * step, bin) to (positions, step * bin).
Use this to restructure data back to its original form after parallel computing
Parameters
--------------
raw_mat : 2D numpy array
Data organized as (positions * step, bin)
num_steps : unsigned int
Number of spectroscopic steps per pixel (eg - UDVS steps)
Returns
---------------
two_d : 2D numpy array
Data rearranged as (positions, step * bin)
"""
num_bins = raw_mat.shape[1]
num_pos = int(raw_mat.shape[0] / num_steps)
one_d = raw_mat
one_d = one_d.reshape(num_bins * num_steps * num_pos)
two_d = one_d.reshape((num_pos, num_steps * num_bins))
return two_d
def _is_reshapable(h5_main, step_start_inds=None):
"""
A BE dataset is said to be reshape-able if the number of bins per steps is constant. Even if the dataset contains
multiple excitation waveforms (harmonics), We know that the measurement is always at the resonance peak, so the
frequency vector should not change.
Parameters
----------
h5_main : h5py.Dataset object
Reference to the main dataset
step_start_inds : list or 1D array
Indices that correspond to the start of each BE pulse / UDVS step
Returns
---------
reshapable : Boolean
Whether or not the number of bins per step are constant in this dataset
"""
h5_main = USIDataset(h5_main)
if step_start_inds is None:
step_start_inds = np.where(h5_main.h5_spec_inds[0] == 0)[0]
# Adding the size of the main dataset as the last (virtual) step
step_start_inds = np.hstack((step_start_inds, h5_main.shape[1]))
num_bins = np.diff(step_start_inds)
step_types = np.unique(num_bins)
return len(step_types) == 1
def _r_square(data_vec, func, *args, **kwargs):
"""
R-square for estimation of the fitting quality
Typical result is in the range (0,1), where 1 is the best fitting
Parameters
----------
data_vec : array_like
Measured data points
func : callable function
Should return a numpy.ndarray of the same shape as data_vec
args :
Parameters to be pased to func
kwargs :
Keyword parameters to be pased to func
Returns
-------
r_squared : float
The R^2 value for the current data_vec and parameters
"""
data_mean = np.mean(data_vec)
ss_tot = sum(abs(data_vec - data_mean) ** 2)
ss_res = sum(abs(data_vec - func(*args, **kwargs)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot > 0 else 0
return r_squared
[docs]
def wavelet_peaks(vector, peak_width_bounds, peak_width_step=20, **kwargs):
"""
This is the function that will be mapped by multiprocess. This is a wrapper around the scipy function.
It uses a parameter - wavelet_widths that is configured outside this function.
Parameters
----------
vector : 1D numpy array
Feature vector containing peaks
Returns
-------
peak_indices : list
List of indices of peaks within the prescribed peak widths
"""
# The below numpy array is used to configure the returned function wpeaks
wavelet_widths = np.linspace(peak_width_bounds[0], peak_width_bounds[1],
peak_width_step)
peak_indices = find_peaks_cwt(np.abs(vector), wavelet_widths, **kwargs)
return peak_indices
[docs]
def complex_gaussian(resp_vec, w_vec, num_points=5):
"""
Sets up the needed parameters for the analytic approximation for the
Gaussian fit of complex data.
Parameters
----------
resp_vec : numpy.ndarray
Data vector to be fit.
args: numpy arrays.
kwargs: Passed to SHOEstimateFit().
Returns
-------
sho_guess: callable function.
"""
guess = SHOestimateGuess(resp_vec, w_vec, num_points)
# Calculate the error and append it.
guess = np.hstack(
[guess, np.array(_r_square(resp_vec, SHOfunc, guess, w_vec))])
return guess
def _sho_error(guess, data_vec, freq_vector):
"""
Generates the single Harmonic Oscillator response over the given vector
Parameters
----------
guess : array-like
The set of guess parameters (Amp,w0,Q,phi) to be tested
data_vec : numpy.ndarray
The data vector to compare the current guess against
freq_vector : numpy.ndarray
The frequencies that correspond to each data point in `data_vec`
Notes
-----
Amp: amplitude
w0: resonant frequency
Q: Quality Factor
phi: Phase
Returns
-------
fitness : float
The 1-r^2 value for the current set of SHO coefficients
"""
if len(guess) < 4:
raise ValueError(
'Error: The Single Harmonic Oscillator requires 4 parameter guesses!')
Amp, w_0, Q, phi = guess[:4]
guess_vec = Amp * np.exp(1.j * phi) * w_0 ** 2 / (
freq_vector ** 2 - 1j * freq_vector * w_0 / Q - w_0 ** 2)
data_mean = np.mean(data_vec)
ss_tot = np.sum(np.abs(data_vec - data_mean) ** 2)
ss_res = np.sum(np.abs(data_vec - guess_vec) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot > 0 else 0
# print('tot: {}\tres: {}\tr2: {}'.format(ss_tot, ss_res, r_squared))
return 1 - r_squared