Source code for BGlib.gmode.proc.signal_filter

# -*- coding: utf-8 -*-
"""
A class for FFT filtering General-mode spectroscopic imaging data as reported in:
[Rapid mapping of polarization switching through complete information acquisition](http://www.nature.com/articles/ncomms13290)

Created on Tue Nov 07 11:48:53 2017

@author: Suhas Somnath

"""

from __future__ import division, print_function, absolute_import, unicode_literals
import h5py
import numpy as np
from collections import Iterable

from pyUSID.processing.comp_utils import parallel_compute
from sidpy.hdf.hdf_utils import write_simple_attrs

from pyUSID import Dimension
from pyUSID.io.hdf_utils import create_results_group, write_main_dataset, create_empty_dataset, \
    write_ind_val_dsets
from pyUSID.processing.process import Process

from .fft import get_noise_floor, are_compatible_filters, build_composite_freq_filter
from .gmode_utils import test_filter

# TODO: correct implementation of num_pix


[docs] class SignalFilter(Process): def __init__(self, h5_main, frequency_filters=None, noise_threshold=None, write_filtered=True, write_condensed=False, num_pix=1, phase_rad=0, **kwargs): """ Filters the entire h5 dataset with the given filtering parameters. Parameters ---------- h5_main : h5py.Dataset object Dataset to process frequency_filters : (Optional) single or list of pycroscopy.fft.FrequencyFilter objects Frequency (vertical) filters to apply to signal noise_threshold : (Optional) float. Default - None Noise tolerance to apply to data. Value must be within (0, 1) write_filtered : (Optional) bool. Default - True Whether to write the filtered data to file write_condensed : Optional) bool. Default - False Whether to write the condensed data in frequency space to file. Use this for datasets that are very large but sparse in frequency space. num_pix : (Optional) uint. Default - 1 Number of pixels to use for filtering. More pixels means a lower noise floor and the ability to pick up weaker signals. Use only if absolutely necessary. This value must be a divisor of the number of pixels in the dataset phase_rad : (Optional). float Degrees by which the output is rotated with respect to the input to compensate for phase lag. This feature has NOT yet been implemented. h5_target_group : h5py.Group, optional. Default = None Location where to look for existing results and to place newly computed results. Use this kwarg if the results need to be written to a different HDF5 file. By default, this value is set to the parent group containing `h5_main` kwargs : (Optional). dictionary Please see Process class for additional inputs """ super(SignalFilter, self).__init__(h5_main, 'FFT_Filtering', **kwargs) if frequency_filters is None and noise_threshold is None: raise ValueError('Need to specify at least some noise thresholding / frequency filter') if noise_threshold is not None: if noise_threshold >= 1 or noise_threshold <= 0: raise ValueError('Noise threshold must be within (0 1)') self.composite_filter = 1 if frequency_filters is not None: if not isinstance(frequency_filters, Iterable): frequency_filters = [frequency_filters] if not are_compatible_filters(frequency_filters): raise ValueError('frequency filters must be a single or list of FrequencyFilter objects') self.composite_filter = build_composite_freq_filter(frequency_filters) else: write_condensed = False if write_filtered is False and write_condensed is False: raise ValueError('You need to write the filtered and/or the condensed dataset to the file') num_effective_pix = h5_main.shape[0] * 1.0 / num_pix if num_effective_pix % 1 > 0: raise ValueError('Number of pixels not divisible by the number of pixels to use for FFT filter') self.num_effective_pix = int(num_effective_pix) self.phase_rad = phase_rad self.noise_threshold = noise_threshold self.frequency_filters = frequency_filters self.write_filtered = write_filtered self.write_condensed = write_condensed """ Remember that the default number of pixels corresponds to only the raw data that can be held in memory In the case of signal filtering, the datasets that will occupy space are: 1. Raw, 2. filtered (real + freq space copies), 3. Condensed (substantially lesser space) The actual scaling of memory depends on options: """ scaling_factor = 1 + 2 * self.write_filtered + 0.25 * self.write_condensed self._max_pos_per_read = int(self._max_pos_per_read / scaling_factor) if self.verbose and self.mpi_rank == 0: print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read)) self.parms_dict = dict() if self.frequency_filters is not None: for filter in self.frequency_filters: self.parms_dict.update(filter.get_parms()) if self.noise_threshold is not None: self.parms_dict['noise_threshold'] = self.noise_threshold self.parms_dict['num_pix'] = self.num_effective_pix self.duplicate_h5_groups, self.partial_h5_groups = self._check_for_duplicates() self.data = None self.filtered_data = None self.condensed_data = None self.noise_floors = None self.h5_filtered = None self.h5_condensed = None self.h5_noise_floors = None
[docs] def test(self, pix_ind=None, excit_wfm=None, **kwargs): """ Tests the signal filter on a single pixel (randomly chosen unless manually specified) worth of data. Parameters ---------- pix_ind : int, optional. default = random Index of the pixel whose data will be used for inference excit_wfm : array-like, optional. default = None Waveform against which the raw and filtered signals will be plotted. This waveform can be a fraction of the length of a single pixel's data. For example, in the case of G-mode, where a single scan line is yet to be broken down into pixels, the excitation waveform for a single pixel can br provided to automatically break the raw and filtered responses also into chunks of the same size. Returns ------- fig, axes """ if self.mpi_rank > 0: return if pix_ind is None: pix_ind = np.random.randint(0, high=self.h5_main.shape[0]) return test_filter(self.h5_main[pix_ind], frequency_filters=self.frequency_filters, excit_wfm=excit_wfm, noise_threshold=self.noise_threshold, plot_title='Pos #' + str(pix_ind), show_plots=True, **kwargs)
def _create_results_datasets(self): """ Creates all the datasets necessary for holding all parameters + data. """ self.h5_results_grp = create_results_group(self.h5_main, self.process_name, h5_parent_group=self._h5_target_group) self.parms_dict.update({'last_pixel': 0, 'algorithm': 'pycroscopy_SignalFilter'}) write_simple_attrs(self.h5_results_grp, self.parms_dict) assert isinstance(self.h5_results_grp, h5py.Group) if isinstance(self.composite_filter, np.ndarray): h5_comp_filt = self.h5_results_grp.create_dataset('Composite_Filter', data=np.float32(self.composite_filter)) if self.verbose and self.mpi_rank == 0: print('Rank {} - Finished creating the Composite_Filter dataset'.format(self.mpi_rank)) # First create the position datsets if the new indices are smaller... if self.num_effective_pix != self.h5_main.shape[0]: # TODO: Do this part correctly. See past solution: """ # need to make new position datasets by taking every n'th index / value: new_pos_vals = np.atleast_2d(h5_pos_vals[slice(0, None, self.num_effective_pix), :]) pos_descriptor = [] for name, units, leng in zip(h5_pos_inds.attrs['labels'], h5_pos_inds.attrs['units'], [int(np.unique(h5_pos_inds[:, dim_ind]).size / self.num_effective_pix) for dim_ind in range(h5_pos_inds.shape[1])]): pos_descriptor.append(Dimension(name, units, np.arange(leng))) ds_pos_inds, ds_pos_vals = build_ind_val_dsets(pos_descriptor, is_spectral=False, verbose=self.verbose) h5_pos_vals.data = np.atleast_2d(new_pos_vals) # The data generated above varies linearly. Override. """ h5_pos_inds_new, h5_pos_vals_new = write_ind_val_dsets(self.h5_results_grp, Dimension('pixel', 'a.u.', self.num_effective_pix), is_spectral=False, verbose=self.verbose and self.mpi_rank==0) if self.verbose and self.mpi_rank == 0: print('Rank {} - Created the new position ancillary dataset'.format(self.mpi_rank)) else: h5_pos_inds_new = self.h5_main.h5_pos_inds h5_pos_vals_new = self.h5_main.h5_pos_vals if self.verbose and self.mpi_rank == 0: print('Rank {} - Reusing source datasets position datasets'.format(self.mpi_rank)) if self.noise_threshold is not None: self.h5_noise_floors = write_main_dataset(self.h5_results_grp, (self.num_effective_pix, 1), 'Noise_Floors', 'Noise', 'a.u.', None, Dimension('arb', '', [1]), dtype=np.float32, aux_spec_prefix='Noise_Spec_', h5_pos_inds=h5_pos_inds_new, h5_pos_vals=h5_pos_vals_new, verbose=self.verbose and self.mpi_rank == 0) if self.verbose and self.mpi_rank == 0: print('Rank {} - Finished creating the Noise_Floors dataset'.format(self.mpi_rank)) if self.write_filtered: # Filtered data is identical to Main_Data in every way - just a duplicate self.h5_filtered = create_empty_dataset(self.h5_main, self.h5_main.dtype, 'Filtered_Data', h5_group=self.h5_results_grp) if self.verbose and self.mpi_rank == 0: print('Rank {} - Finished creating the Filtered dataset'.format(self.mpi_rank)) self.hot_inds = None if self.write_condensed: self.hot_inds = np.where(self.composite_filter > 0)[0] self.hot_inds = np.uint(self.hot_inds[int(0.5 * len(self.hot_inds)):]) # only need to keep half the data condensed_spec = Dimension('hot_frequencies', '', int(0.5 * len(self.hot_inds))) self.h5_condensed = write_main_dataset(self.h5_results_grp, (self.num_effective_pix, len(self.hot_inds)), 'Condensed_Data', 'Complex', 'a. u.', None, condensed_spec, h5_pos_inds=h5_pos_inds_new, h5_pos_vals=h5_pos_vals_new, dtype=np.complex, verbose=self.verbose and self.mpi_rank == 0) if self.verbose and self.mpi_rank == 0: print('Rank {} - Finished creating the Condensed dataset'.format(self.mpi_rank)) if self.mpi_size > 1: self.mpi_comm.Barrier() self.h5_main.file.flush() def _get_existing_datasets(self): """ Extracts references to the existing datasets that hold the results """ if self.write_filtered: self.h5_filtered = self.h5_results_grp['Filtered_Data'] if self.write_condensed: self.h5_condensed = self.h5_results_grp['Condensed_Data'] if self.noise_threshold is not None: self.h5_noise_floors = self.h5_results_grp['Noise_Floors'] def _write_results_chunk(self): """ Writes data chunks back to the file """ # Get access to the private variable: pos_in_batch = self._get_pixels_in_current_batch() if self.write_condensed: self.h5_condensed[pos_in_batch, :] = self.condensed_data if self.noise_threshold is not None: self.h5_noise_floors[pos_in_batch, :] = np.atleast_2d(self.noise_floors) if self.write_filtered: self.h5_filtered[pos_in_batch, :] = self.filtered_data # Not responsible for checkpointing anymore. Process class handles this. def _unit_computation(self, *args, **kwargs): """ Processing per chunk of the dataset Parameters ---------- args : list Not used kwargs : dictionary Not used """ # get FFT of the entire data chunk self.data = np.fft.fftshift(np.fft.fft(self.data, axis=1), axes=1) if self.noise_threshold is not None: self.noise_floors = parallel_compute(self.data, get_noise_floor, cores=self._cores, func_args=[self.noise_threshold], verbose=self.verbose) if isinstance(self.composite_filter, np.ndarray): # multiple fft of data with composite filter self.data *= self.composite_filter if self.noise_threshold is not None: # apply thresholding self.data[np.abs(self.data) < np.tile(np.atleast_2d(self.noise_floors), self.data.shape[1])] = 1E-16 if self.write_condensed: # set self.condensed_data here self.condensed_data = self.data[:, self.hot_inds] if self.write_filtered: # take inverse FFT self.filtered_data = np.real(np.fft.ifft(np.fft.ifftshift(self.data, axes=1), axis=1)) if self.phase_rad > 0: # TODO: implement phase compensation # do np.roll on data # self.data = np.roll(self.data, 0, axis=1) pass