Source code for BGlib.be.translators.beps_ndf

# -*- coding: utf-8 -*-
"""
Created on Tue Nov  3 15:07:16 2015

@author: Suhas Somnath

"""

from __future__ import division, print_function, absolute_import, unicode_literals

import sys
from os import path, listdir, remove
from warnings import warn
import h5py
import numpy as np
import xlrd as xlreader  # To read the UDVS spreadsheet
from scipy.io.matlab import loadmat  # To load parameters stored in Matlab .mat file

from sidpy.sid import Translator
from sidpy.hdf.hdf_utils import link_h5_objects_as_attrs, write_simple_attrs
from pyUSID.processing.comp_utils import get_available_memory

from pyUSID.io.anc_build_utils import make_indices_matrix, VALUES_DTYPE, \
    INDICES_DTYPE, calc_chunks
from pyUSID.io.usi_data import USIDataset
from pyUSID.io.hdf_utils import create_indexed_group, check_if_main, print_tree

from .df_utils.be_utils import trimUDVS, getSpectroscopicParmLabel, \
    parmsToDict, generatePlotGroups, normalizeBEresponse, createSpecVals, nf32

if sys.version_info.major == 3:
    unicode = str


[docs] class BEPSndfTranslator(Translator): """ Translates Band Excitation Polarization Switching (BEPS) datasets from .dat files to .h5 """ def __init__(self, *args, **kwargs): super(BEPSndfTranslator, self).__init__(*args, **kwargs) self._verbose = False self.parm_dict = dict() self.field_mode = None self.spec_label = None self.halve_udvs_steps = None self.BE_wave = None self.BE_wave_rev = None self.BE_bin_inds = None self.udvs_mat = None self.udvs_labs = None self.udvs_units = None self.num_udvs_steps = None self.excit_type_vec = None self.__unique_waves__ = None self.__num_wave_types__ = None self.max_pixels = None self.pos_labels = None self.pos_mat = None self.pos_units = None self.hdf = None self.max_ram = int(get_available_memory()*0.8 / (1024 ** 2))
[docs] def is_valid_file(self, data_path): """ Checks whether the provided file can be read by this translator Parameters ---------- data_path : str Path to raw data file Returns ------- obj : str Path to file that will be accepted by the translate() function if this translator is indeed capable of translating the provided file. Otherwise, None will be returned """ if not isinstance(data_path, (str, unicode)): raise TypeError('data_path must be a string') data_path = path.abspath(data_path) if path.isfile(data_path): # we only care about the folder names at this point... data_path, _ = path.split(data_path) ndf = 'newdataformat' _, folder_name = path.split(data_path) if folder_name == ndf: ndf_folder = data_path else: # perhaps we are looking at the folder above? if ndf not in listdir(path=data_path): return None ndf_folder = path.join(data_path, ndf) # take any file within the new data format folder I guess... file_path = path.join(ndf_folder, listdir(path=ndf_folder)[0]) parm_filepath, udvs_filepath, parms_mat_path = self._parse_file_path(file_path) is_beps, _ = parmsToDict(parm_filepath) if not is_beps: return None return parm_filepath
[docs] def translate(self, data_filepath, show_plots=True, save_plots=True, do_histogram=False, verbose=False): """ The main function that translates the provided file into a .h5 file Parameters ---------------- data_filepath : String / unicode Absolute path of the data file (.dat) show_plots : Boolean (Optional. Default is True) Whether or not to show plots save_plots : Boolean (Optional. Default is True) Whether or not to save the generated plots do_histogram : Boolean (Optional. Default is False) Whether or not to generate and save 2D histograms of the raw data verbose : Boolean (Optional. default is false) Whether or not to print log statements Returns -------------- h5_path : String / unicode Absolute path of the generated .h5 file """ data_filepath = path.abspath(data_filepath) # Read the parameter files self._verbose = verbose if self._verbose: print('BEndfTranslator: Getting file paths') parm_filepath, udvs_filepath, parms_mat_path = self._parse_file_path(data_filepath) if self._verbose: print('BEndfTranslator: Reading Parms text file') isBEPS, self.parm_dict = parmsToDict(parm_filepath) self.parm_dict['data_type'] = 'BEPSData' if not isBEPS: warn('This is NOT a BEPS new-data-format dataset!') return None """ Find out if this is a custom experiment and whether in and out of field were acquired For a standard experiment where only in / out field is acquired, zeros are stored even for those UDVS steps without band excitation""" self.field_mode = self.parm_dict['VS_measure_in_field_loops'] expt_type = self.parm_dict['VS_mode'] self.spec_label = getSpectroscopicParmLabel(expt_type) std_expt = expt_type in ['DC modulation mode', 'current mode'] self.halve_udvs_steps = False ignored_plt_grps = [] if std_expt and self.field_mode != 'in and out-of-field': self.halve_udvs_steps = True if self.field_mode == 'out-of-field': ignored_plt_grps = ['in-field'] else: ignored_plt_grps = ['out-of-field'] h5_path = path.join(self.folder_path, self.basename + '.h5') if path.exists(h5_path): remove(h5_path) if self._verbose: print('BEndfTranslator: Preparing to read parms.mat file') self.BE_wave, self.BE_wave_rev, self.BE_bin_inds = self.__get_excit_wfm(parms_mat_path) if self._verbose: print('BEndfTranslator: About to read UDVS file') self.udvs_labs, self.udvs_units, self.udvs_mat = self.__read_udvs_table(udvs_filepath) # Remove the unused plot group columns before proceeding: self.udvs_mat, self.udvs_labs, self.udvs_units = trimUDVS(self.udvs_mat, self.udvs_labs, self.udvs_units, ignored_plt_grps) if self._verbose: print('BEndfTranslator: Read UDVS file') self.num_udvs_steps = self.udvs_mat.shape[0] # This is absolutely crucial for reconstructing the data chronologically self.excit_type_vec = (self.udvs_mat[:, 4]).astype(int) # First figure out how many waveforms are present in the data from the UDVS unique_waves = self.__get_unique_wave_types(self.excit_type_vec) self.__unique_waves__ = unique_waves self.__num_wave_types__ = len(unique_waves) # print self.__num_wave_types__, 'different excitation waveforms in this experiment' if self._verbose: print('BEndfTranslator: Preparing to set up parsers') # Preparing objects to parse the file(s) parsers = self.__assemble_parsers() # Gathering some basic details before parsing the files: self.max_pixels = parsers[0].get_num_pixels() s_pixels = np.array(parsers[0].get_spatial_pixels()) self.pos_labels = ['Laser Spot', 'Z', 'Y', 'X'] self.pos_labels = [self.pos_labels[i] for i in np.where(s_pixels > 1)[0]] if len(self.pos_labels) == 0: self.pos_labels = ['X'] self.pos_mat = make_indices_matrix(1) else: self.pos_mat = make_indices_matrix(s_pixels[np.argwhere(s_pixels > 1)].squeeze()) self.pos_units = ['um' for _ in range(len(self.pos_labels))] # self.pos_mat = np.int32(self.pos_mat) # Helping Eric out a bit. Remove this section at a later time: main_parms = dict() # main_parms['grid_size_x'] = self.parm_dict['grid_num_cols'] # main_parms['grid_size_y'] = self.parm_dict['grid_num_rows'] main_parms['grid_size_x'] = self.parm_dict['grid_num_rows'] main_parms['grid_size_y'] = self.parm_dict['grid_num_cols'] main_parms['experiment_date'] = self.parm_dict['File_date_and_time'] # assuming that the experiment was completed: main_parms['current_position_x'] = self.parm_dict['grid_num_rows'] - 1 main_parms['current_position_y'] = self.parm_dict['grid_num_cols'] - 1 main_parms['data_type'] = 'BEPSData' main_parms['translator'] = 'NDF' # Writing only the root now: self.hdf = h5py.File(h5_path, mode='w') write_simple_attrs(self.hdf, main_parms) ######################################################## # Reading and parsing the .dat file(s) self._read_data(parsers, unique_waves, show_plots, save_plots, do_histogram) self.hdf.close() return h5_path
def _read_data(self, parsers, unique_waves, show_plots, save_plots, do_histogram): """ Loops over all pixels and reads the data into the HDF5 file. Parameters ---------- parsers : list of BEPSndfPixel List of parser object that will read the pixel data from the files unique_waves : numpy.ndarray of int Array denoting the unique waveforms in the experiment show_plots : Boolean, optional Should generated plots be shown during translation. Default True save_plots : Boolean, optional Should generated plots be saved to disk during translation. Default True do_histogram : Boolean, optional Should histograms be generated for the different plot groups. Default False Returns ------- None """ print('Reading data file(s)') self.dset_index = 0 self.ds_pixel_start_indx = 0 for pixel_ind in range(self.max_pixels): if (100.0 * (pixel_ind + 1) / self.max_pixels) % 10 == 0: print('{} % complete'.format(int(100 * (pixel_ind + 1) / self.max_pixels))) # First read the next pixel from all parsers: current_pixels = {} for prsr in parsers: wave_type = prsr.get_wave_type() if self.parm_dict['VS_mode'] == 'AC modulation mode with time reversal' and \ self.BE_bin_inds is not None: if np.sign(wave_type) == -1: bin_fft = self.BE_wave[self.BE_bin_inds] elif np.sign(wave_type) == 1: bin_fft = self.BE_wave_rev[self.BE_bin_inds] else: bin_fft = None current_pixels[wave_type] = prsr.read_pixel(bin_fft) if pixel_ind == 0: h5_refs = self.__initialize_meas_group(self.max_pixels, current_pixels) prev_pixels = current_pixels # This is here only to avoid annoying warnings. else: if current_pixels[unique_waves[0]].is_different_from(prev_pixels[unique_waves[0]]): # Some parameter has changed. Write current group and make new group self.__close_meas_group(h5_refs, show_plots, save_plots, do_histogram) self.ds_pixel_start_indx = pixel_ind h5_refs = self.__initialize_meas_group(self.max_pixels - pixel_ind, current_pixels) # print('reading Pixel {} of {}'.format(pixel_ind,self.max_pixels)) self.__append_pixel_data(current_pixels) prev_pixels = current_pixels self.__close_meas_group(h5_refs, show_plots, save_plots, do_histogram) ################################################################################################### def __close_meas_group(self, h5_refs, show_plots, save_plots, do_histogram): """ Performs following operations : * Updates the number of pixels attribute in the measurement group * Writes Noise floor axis labels as region references * Writes position values and indices along with region references * Links all ancilliary datasets to the main data set * Writes the spatiall averaged plot data Parameters ---------- h5_refs : list of HDF references References to the written datasets show_plots : Boolean Whether or not to show plots save_plots : Boolean Whether or not to save the generated plots do_histogram : Boolean Whether or not to generate and save 2D histograms of the raw data Returns ------- None """ # Update the number of pixels in the attributes meas_grp = self.ds_main.parent write_simple_attrs(meas_grp, {'num_pix': self.ds_pixel_index}) # Write position specific datasets now that the dataset is complete pos_slice_dict = dict() for spat_ind, spat_dim in enumerate(self.pos_labels): pos_slice_dict[spat_dim] = (slice(None), slice(spat_ind, spat_ind + 1)) self.pos_vals_list = np.array(self.pos_vals_list) # Ensuring that the X and Y values vary from 0 to N instead of -0.5 N to + 0.5 N for col_ind in range(2): min_val = np.min(self.pos_vals_list[:, col_ind]) self.pos_vals_list[:, col_ind] -= min_val self.pos_vals_list[:, col_ind] *= 1E+6 # convert to microns if np.max(self.pos_vals_list[:, 2]) > 1E-3: # Setpoint spectroscopy if 'Z' in self.pos_labels: dim_ind = self.pos_labels.index('Z') # TODO: Find a way to correct the labels # self.pos_labels[dim_ind] = 'Setpoint' self.pos_units[dim_ind] = 'defl V' else: # Z spectroscopy self.pos_vals_list[:, 2] *= 1E+6 # convert to microns pos_val_mat = VALUES_DTYPE(self.pos_mat[self.ds_pixel_start_indx:self.ds_pixel_start_indx + self.ds_pixel_index, :]) for col_ind, targ_dim_name in enumerate(['X', 'Y', 'Z']): if targ_dim_name in self.pos_labels: dim_ind = self.pos_labels.index(targ_dim_name) # Replace indices with the x, y, z values from the pixels pos_val_mat[:, dim_ind] = self.pos_vals_list[:, col_ind] pos_inds = self.pos_mat[self.ds_pixel_start_indx: self.ds_pixel_start_indx + self.ds_pixel_index, :] h5_chan_grp = self.ds_main.parent h5_pos_ind = h5_chan_grp.create_dataset('Position_Indices', data=pos_inds, dtype=INDICES_DTYPE) h5_pos_val = h5_chan_grp.create_dataset('Position_Values', data=pos_val_mat, dtype=VALUES_DTYPE) for anc_dset in [h5_pos_ind, h5_pos_val]: write_simple_attrs(anc_dset, {'labels': self.pos_labels, 'units': self.pos_units}) link_h5_objects_as_attrs(self.ds_main, [h5_pos_ind, h5_pos_val]) # Do all the reference linking: aux_ds_names = ['Excitation_Waveform', 'Position_Indices', 'Position_Values', 'UDVS_Indices', 'Spectroscopic_Indices', 'Bin_Step', 'Bin_Indices', 'Bin_Wfm_Type', 'Bin_Frequencies', 'Bin_FFT', 'UDVS', 'UDVS_Labels', 'Noise_Floor', 'Spectroscopic_Values'] # While we have all the references and mean data, write the plot groups as well: generatePlotGroups(USIDataset(self.ds_main), self.mean_resp, self.folder_path, self.basename, self.max_resp, self.min_resp, max_mem_mb=self.max_ram, spec_label=self.spec_label, show_plots=show_plots, save_plots=save_plots, do_histogram=do_histogram, debug=self._verbose) # Now that everything about this dataset is complete: self.dset_index += 1 # ################################################################################################## def __initialize_meas_group(self, num_pix, current_pixels): """ Creates and initializes the primary (and auxillary) datasets and datagroups to hold the raw data for the current set of experimental parameters. Parameters ---------- num_pix : unsigned int Number of pixels this datagroup is expected to hold current_pixels : dictionary of BEPSndfPixel objects Extracted data for the first pixel in this group Returns --------- h5_refs : list of HDF5group and HDF5Dataset references references of the written H5 datasets """ tot_bins = 0 tot_pts = 0 # Each wavetype can have different number of bins for pixl in current_pixels.values(): tot_bins += pixl.num_bins tot_pts += pixl.num_bins * pixl.num_steps # Need to halve the number of steps when only in / out field is acquired: if self.halve_udvs_steps: tot_pts = int(tot_pts / 2) # Populate information from the columns within the pixels such as the FFT, bin freq, indices, etc. bin_freqs = np.zeros(shape=tot_bins, dtype=np.float32) bin_inds = np.zeros(shape=tot_bins, dtype=np.uint32) bin_FFT = np.zeros(shape=tot_bins, dtype=np.complex64) exec_bin_vec = np.zeros(shape=tot_bins, dtype=np.int32) pixel_bins = {} # Might be useful later stind = 0 for wave_type in self.__unique_waves__: pixl = current_pixels[wave_type] exec_bin_vec[stind:stind + pixl.num_bins] = wave_type * np.ones(pixl.num_bins) bin_inds[stind:stind + pixl.num_bins] = pixl.BE_bin_ind bin_freqs[stind:stind + pixl.num_bins] = pixl.BE_bin_w bin_FFT[stind:stind + pixl.num_bins] = pixl.FFT_BE_wave pixel_bins[wave_type] = [stind, pixl.num_bins] stind += pixl.num_bins del pixl, stind # Make the index matrix that has the UDVS step number and bin indices spec_inds = np.zeros(shape=(2, tot_pts), dtype=INDICES_DTYPE) stind = 0 # Need to go through the UDVS file and reconstruct chronologically for step_index, wave_type in enumerate(self.excit_type_vec): if self.halve_udvs_steps and self.udvs_mat[step_index, 2] < 1E-3: # invalid AC amplitude continue # skip vals = pixel_bins[wave_type] spec_inds[1, stind:stind + vals[1]] = step_index * np.ones(vals[1]) # UDVS step spec_inds[0, stind:stind + vals[1]] = np.arange(vals[0], vals[0] + vals[1]) # Bin step stind += vals[1] del stind, wave_type, step_index self.spec_inds = spec_inds # will need this for plot group generation h5_meas_grp = create_indexed_group(self.hdf, 'Measurement') curr_parm_dict = self.parm_dict # Some very basic information that can help the processing crew curr_parm_dict['num_bins'] = tot_bins curr_parm_dict['num_pix'] = num_pix write_simple_attrs(h5_meas_grp, curr_parm_dict) h5_chan_grp = create_indexed_group(h5_meas_grp, 'Channel') write_simple_attrs(h5_chan_grp, {'Channel_Input': curr_parm_dict['IO_Analog_Input_1']}) ds_ex_wfm = h5_chan_grp.create_dataset('Excitation_Waveform', data=np.float32(np.real(np.fft.ifft(np.fft.ifftshift(self.BE_wave))))) ds_bin_freq = h5_chan_grp.create_dataset('Bin_Frequencies', data=bin_freqs) ds_bin_inds = h5_chan_grp.create_dataset('Bin_Indices', data=bin_inds - 1, dtype=np.uint32) # From Matlab to Python (base 0) ds_bin_fft = h5_chan_grp.create_dataset('Bin_FFT', data=bin_FFT) ds_wfm_typ = h5_chan_grp.create_dataset('Bin_Wfm_Type', data=exec_bin_vec) ds_bin_steps = h5_chan_grp.create_dataset('Bin_Step', data=np.arange(tot_bins, dtype=np.uint32)) ds_udvs_mat = h5_chan_grp.create_dataset('UDVS', data=self.udvs_mat) write_simple_attrs(ds_udvs_mat, {'labels': self.udvs_labs, 'units': self.udvs_units}) ds_udvs_inds = h5_chan_grp.create_dataset('UDVS_Indices', data=self.spec_inds[1]) actual_udvs_steps = self.num_udvs_steps if self.halve_udvs_steps: actual_udvs_steps /= 2 if actual_udvs_steps % 1: raise ValueError('Actual number of UDVS steps should be an integer') actual_udvs_steps = int(actual_udvs_steps) curr_parm_dict['num_udvs_steps'] = actual_udvs_steps ''' Create the Spectroscopic Values tables ''' ret_vals = createSpecVals(self.udvs_mat, spec_inds, bin_freqs, exec_bin_vec, curr_parm_dict, np.array(self.udvs_labs), self.udvs_units, verbose=self._verbose) spec_vals, spec_inds, spec_vals_labs, spec_vals_units, spec_vals_labs_names = ret_vals ds_spec_vals_mat = h5_chan_grp.create_dataset('Spectroscopic_Values', data=np.array(spec_vals), dtype=VALUES_DTYPE) ds_spec_inds_mat = h5_chan_grp.create_dataset('Spectroscopic_Indices', data=np.array(spec_inds), dtype=INDICES_DTYPE) for dset in [ds_spec_inds_mat, ds_spec_vals_mat]: write_simple_attrs(dset, {'labels': spec_vals_labs, 'units': spec_vals_units}) """ # Not sure what was happening here: for entry in spec_vals_labs_names: label = entry[0] + '_parameters' names = entry[1] ds_spec_mat.attrs[label] = names ds_spec_vals_mat.attrs[label] = names """ ''' New Method for chunking the Main_Data dataset. Chunking is now done in N-by-N squares of UDVS steps by pixels. N is determined dinamically based on the dimensions of the dataset. Currently it is set such that individual chunks are less than 10kB in size. Chris Smith -- csmith55@utk.edu ''' """ max_bins_per_pixel = np.max(list(pixel_bins.values())) beps_chunks = calc_chunks([num_pix, tot_pts], np.complex64(0).itemsize, unit_chunks=(1, max_bins_per_pixel)) """ self.ds_main = h5_chan_grp.create_dataset('Raw_Data', shape=(1, tot_pts), dtype=np.complex64, maxshape=(None, tot_pts)) write_simple_attrs(self.ds_main, {'quantity': 'Piezoresponse', 'units': 'V'}) self.ds_noise = h5_chan_grp.create_dataset('Noise_Floor', shape=(1, actual_udvs_steps), dtype=nf32, maxshape=(None, actual_udvs_steps)) link_h5_objects_as_attrs(self.ds_main, [ds_spec_vals_mat, ds_spec_inds_mat]) h5_refs = [self.ds_main, self.ds_noise, ds_udvs_mat, ds_udvs_inds, ds_bin_steps, ds_wfm_typ, ds_bin_fft, ds_bin_inds, ds_bin_freq, ds_ex_wfm] self.pos_vals_list = list() # self.dset_index += 1 # raise dset index after closing only self.ds_pixel_index = 0 # Use this for plot groups: self.mean_resp = np.zeros(shape=tot_pts, dtype=np.complex64) # Used for Histograms self.max_resp = np.zeros(shape=num_pix, dtype=np.float32) self.min_resp = np.zeros(shape=num_pix, dtype=np.float32) return h5_refs # ################################################################################################## def __append_pixel_data(self, pixel_data): """ Goes through the list of pixel objects and populates the raw dataset and noise dataset for this spatial pixel. Parameters ---------- pixel_data : List of BEPSndfPixel objects List containing parsed data for this particular spatial pixel Returns --------- None """ if self.__num_wave_types__ == 1 and not self.halve_udvs_steps: """Technically, this will be taken care of in the later (general) part but since this condition is more common it is worth writing for specifically""" zero_pix = self.__unique_waves__[0] data_vec = pixel_data[zero_pix].spectrogram_vec noise_mat = np.float32(pixel_data[zero_pix].noise_floor_mat) # Storing a list of lists since we don't know how many pixels we will find in this measurement group self.pos_vals_list.append([pixel_data[zero_pix].x_value, pixel_data[zero_pix].y_value, pixel_data[zero_pix].z_value]) else: data_vec = np.zeros(shape=(self.ds_main.shape[1]), dtype=np.complex64) noise_mat = np.zeros(shape=(3, self.ds_noise.shape[1]), dtype=np.float32) internal_step_index = {} for wave_type in self.__unique_waves__: internal_step_index[wave_type] = 0 stind = 0 step_counter = 0 # Go through each line in the UDVS file and reconstruct chronologically for step_index, wave_type in enumerate(self.excit_type_vec): # get the noise and data from correct pixel -> address by wave_number. if self.halve_udvs_steps and self.udvs_mat[step_index, 2] < 1E-3: # invalid AC amplitude # print('Step index {} was skipped'.format(step_index)) # Not sure why each wave type has its own counter but there must have been a good reason internal_step_index[wave_type] += 1 continue # skip data_pix = pixel_data[wave_type].spectrogram_mat noise_pix = pixel_data[wave_type].noise_floor_mat enind = stind + pixel_data[wave_type].num_bins data_vec[stind:enind] = data_pix[:, internal_step_index[wave_type]] noise_mat[:, step_counter] = np.float32(noise_pix[:, internal_step_index[wave_type]]) stind = enind internal_step_index[wave_type] += 1 step_counter += 1 # Storing a list of lists since we don't know how many pixels we will find in this measurement group self.pos_vals_list.append([pixel_data[wave_type].x_value, pixel_data[wave_type].y_value, pixel_data[wave_type].z_value]) del internal_step_index, stind, enind, step_index, wave_type, step_counter if self.ds_pixel_index > 0: # in the case of the first pixel, we already reserved zeros- no extension # for all other lines - we extend the dataset before writing self.ds_main.resize(self.ds_main.shape[0] + 1, axis=0) self.ds_noise.resize(self.ds_noise.shape[0] + 1, axis=0) self.ds_main[-1, :] = data_vec self.ds_noise[-1] = np.array([tuple(noise) for noise in noise_mat.T], dtype=nf32) self.hdf.file.flush() # Take mean response here: self.mean_resp = (1 / (self.ds_pixel_index + 1)) * (data_vec + self.ds_pixel_index * self.mean_resp) self.max_resp[self.ds_pixel_index] = np.amax(np.abs(data_vec)) self.min_resp[self.ds_pixel_index] = np.amin(np.abs(data_vec)) self.ds_pixel_index += 1 ################################################################################################### def _parse_file_path(self, file_path): """ Returns the file paths to the parms text file and UDVS spreadsheet.\n Note: This function also initializes the basename and the folder_path for this instance Parameters ---------- file_path : String / unicode Absolute path of the any file inside the measurment folder Returns ------- parm_filepath : String / unicode absolute filepath of the parms text file udvs_filepath : String / unicode absolute file path of the UDVS spreadsheet parms_mat_path : String / unicode absolute filepath of the .mat parms file """ udvs_filepath = None folder_path, tail = path.split(file_path) main_folder_path = None for item in listdir(folder_path): if item == 'newdataformat': main_folder_path = folder_path self.folder_path = path.join(folder_path, item) break if main_folder_path is None: self.folder_path = folder_path # need to set the parent as the root_folder main_folder_path, tail = path.split(folder_path) parms_mat_path = None parm_filepath = None for filenames in listdir(main_folder_path): if filenames.endswith('.txt') and filenames.find('parm') > 0: parm_filepath = path.join(main_folder_path, filenames) elif filenames.endswith('all.mat'): parms_mat_path = path.join(main_folder_path, filenames) elif filenames.endswith('more_parms.mat') and parms_mat_path is None: parms_mat_path = path.join(main_folder_path, filenames) for filenames in listdir(self.folder_path): if (filenames.endswith('.xlsx') or filenames.endswith('.xls')) and filenames.find('UD_VS') > 0: udvs_filepath = path.join(self.folder_path, filenames) break if parm_filepath is None: for filenames in listdir(self.folder_path): if (filenames.endswith('.xls') or filenames.endswith('.xlsx')) and filenames.find('parm') > 0: parm_filepath = path.join(main_folder_path, filenames) (tail, self.basename) = path.split(main_folder_path) return parm_filepath, udvs_filepath, parms_mat_path ################################################################################################### def __assemble_parsers(self): """ Returns a list of BEPSndfParser objects per excitation wave type Returns --------- parsers: list of BEPSndfParser objects list of the same length as the input numpy array """ parsers = [] # Maybe this needs to be a dictionary instead for easier access? for wave_type in self.__unique_waves__: filename = '_1_' if wave_type > 0: filename = '_1_' + str(wave_type) + '.dat' else: filename = '_1_r' + str(abs(wave_type)) + '.dat' datapath = path.join(self.folder_path, self.basename + filename) if not path.isfile(datapath): # It is possible that the folder was renamed # i.e. - try finding without self.basename temp = [this for this in listdir(self.folder_path) if this.endswith(filename)] if len(temp) == 1: warn('.dat file: {} did not have same base name as ' 'root folder: {}'.format(temp[0], self.basename)) datapath = path.join(self.folder_path, temp[0]) else: raise FileNotFoundError('{} expected but not found!' ''.format(filename)) # return parsers.append(BEPSndfParser(datapath, wave_type)) return parsers # ################################################################################################## @staticmethod def __get_excit_wfm(filepath): """ Returns the excitation BE waveform present in the more parms.mat file Parameters ------------ filepath : String / unicode Absolute filepath of the .mat parameter file Returns ----------- ex_wfm : 1D numpy float array Band Excitation waveform """ if not path.exists(filepath): warn('BEPSndfTranslator - NO more_parms.mat file found') return np.zeros(1000, dtype=np.float32) if 'more_parms' in filepath: matread = loadmat(filepath, variable_names=['FFT_BE_wave']) fft_full = np.complex64(np.squeeze(matread['FFT_BE_wave'])) bin_inds = None fft_full_rev = None else: matread = loadmat(filepath, variable_names=['FFT_BE_wave', 'FFT_BE_rev_wave', 'BE_bin_ind']) bin_inds = np.uint(np.squeeze(matread['BE_bin_ind'])) - 1 fft_full = np.complex64(np.squeeze(matread['FFT_BE_wave'])) fft_full_rev = np.complex64(np.squeeze(matread['FFT_BE_rev_wave'])) return fft_full, fft_full_rev, bin_inds # ################################################################################################## @staticmethod def __read_udvs_table(udvs_filepath): """ Reads the UDVS spreadsheet in either .xls or .xlsx format! Parameters ---------- udvs_filepath : String / unicode absolute path to the spreadsheet Returns ---------- udvs_labs : list of strings names of columns in the UDVS table udvs_units : list of strings units for columns in the UDVS table UDVS_mat : 2D numpy float array Contents of the UDVS table """ workbook = xlreader.open_workbook(udvs_filepath) worksheet = workbook.sheet_by_index(0) udvs_labs = list() for col in range(worksheet.ncols): udvs_labs.append(str(worksheet.cell(0, col).value)) # sometimes, the first few columns are named incorrectly. FORCE them to be named correclty: udvs_units = list(['' for _ in range(len(udvs_labs))]) udvs_labs[0:5] = ['step_num', 'dc_offset', 'ac_amp', 'wave_type', 'wave_mod'] udvs_units[0:5] = ['', 'V', 'A', '', ''] udvs_mat = np.zeros(shape=(worksheet.nrows - 1, worksheet.ncols), dtype=np.float32) for row in range(1, worksheet.nrows): for col in range(worksheet.ncols): try: udvs_mat[row - 1, col] = worksheet.cell(row, col).value except ValueError: udvs_mat[row - 1, col] = float('NaN') except: raise # Decrease counter of number of steps by 1 (Python base 0) udvs_mat[:, 0] -= 1 return udvs_labs, udvs_units, udvs_mat @staticmethod def __get_unique_wave_types(vec): """ Returns a numpy array containing the different waveforms in a BEPS experiment in the format: [1,-1,2,-2.....] Parameters --------------- vec : 1D list or 1D numpy array waveform types in the UDVS table Returns ---------- uniq : 1D numpy array Unique waveform types in format listed above """ sorted_all = np.unique(vec) pos_vals = sorted_all[sorted_all >= 0] neg_vals = sorted_all[sorted_all < 0] if len(pos_vals) == 0: return neg_vals if len(neg_vals) == 0: return pos_vals uniq = [] posind = 0 negind = len(neg_vals) - 1 while posind < len(pos_vals) or negind >= 0: if posind == len(pos_vals): uniq += list(neg_vals) break if negind == len(neg_vals): uniq += list(pos_vals) break # Otherwise compare if pos_vals[posind] < abs(neg_vals[negind]): uniq.append(pos_vals[posind]) posind += 1 elif pos_vals[posind] == abs(neg_vals[negind]): uniq.append(pos_vals[posind]) uniq.append(neg_vals[negind]) posind += 1 negind -= 1 else: uniq.append(neg_vals[negind]) negind -= 1 return np.array(uniq)
[docs] class BEPSndfParser(object): """ An object of this class is given the responsibility to step through a BEPS new data format file and return parsed BEPSndfPixel objects.\n This class is NOT responsible for actually parsing the byte contents within each pixel.\n Each wave type is given its own Parser object since it has a file of its own """ def __init__(self, file_path, wave_type=1, scout=True): """ Initializes the BEPSndfParser object with following inputs: Parameters ----------- file_path : string or unicode Absolute path of the .dat file wave_type : int (optional. Default = 1) Integer value signifying type of the excitation waveform\n scout : Boolean (optional. Default = true) whether or not the parser should figure out basic details such as the number of pixels, and the spatial dimensionality """ self.__file_handle__ = open(file_path, "rb") self.__EOF__ = False self.__curr_Pixel__ = 0 self.__start_point__ = 0 self.__wave_type__ = wave_type self.__filesize__ = path.getsize(file_path) self.__pixel_indices__ = list() if scout: self.__scout()
[docs] def get_wave_type(self): """ Returns the excitation wave type as an integer Returns ------- wave_type : int Wave type. Positive number means chirp up, negative number is chirp down. """ return self.__wave_type__
[docs] def get_num_pixels(self): """ Returns the total number of spatial pixels. This includes X, Y, Z, Laser positions Returns ------- num_pix : unsigned int Number of pixels in this file """ return self.__num_pixels__
[docs] def get_spatial_pixels(self): """ Returns the number of steps in each spatial dimension organized from fastest to slowest varying dimension Returns ------- __num_laser_steps__ : unsigned int Number of laser steps __num_z_steps__ : unsigned int Number of height steps __num_x_steps__ : unsigned int Number of columns __num_y_steps__ : unsigned int Number of rows """ return self.__num_laser_steps__, self.__num_z_steps__, self.__num_x_steps__, self.__num_y_steps__
# Don't use this to figure out if something changes. You need pixel to previous pixel comparison def __scout(self): """ Steps through the file quickly without parsing it. The idea is to calculate the number of pixels ahead of time so that it is easier to parse the dataset. For phase checking, it is recommended that this function be modified to also keep track of the byte positions of the pixels so that pixels can be directly accessed if need be. """ count = 0 self.__num_pixels__ = 0 while True: self.__pixel_indices__.append(self.__start_point__ * 4) self.__file_handle__.seek(self.__start_point__ * 4, 0) spectrogram_length = int(np.fromstring(self.__file_handle__.read(4), dtype='f')[0]) # length of spectrogram if count == 0: self.__file_handle__.seek(self.__start_point__ * 4, 0) data_vec = np.fromstring(self.__file_handle__.read(spectrogram_length * 4), dtype='f') pix = BEPSndfPixel(data_vec, self.__wave_type__) self.__num_x_steps__ = pix.num_x_steps self.__num_y_steps__ = pix.num_y_steps self.__num_z_steps__ = pix.num_z_steps self.__num_bins__ = pix.num_bins count += 1 self.__start_point__ += spectrogram_length if self.__filesize__ == self.__start_point__ * 4: self.__num_pixels__ = count # Laser position spectroscopy is NOT accounted for anywhere. # It is impossible to find out from the parms.txt, UD_VS, or the binary .dat file num_laser_steps = 1.0 * count / (self.__num_z_steps__ * self.__num_y_steps__ * self.__num_x_steps__) if num_laser_steps % 1.0 != 0: print('Some parameter changed inbetween. \ BEPS NDF Translator does not handle this usecase at the moment') else: self.__num_laser_steps__ = int(num_laser_steps) break if self.__filesize__ > self.__start_point__ * 4: self.__num_pixels__ = -1 self.__start_point__ = 0 spat_dim = 0 if self.__num_z_steps__ > 1: # print('Z is varying') spat_dim += 1 if self.__num_y_steps__ > 1: # print('Y is varying') spat_dim += 1 if self.__num_x_steps__ > 1: # print('X is varying') spat_dim += 1 if self.__num_laser_steps__ > 1: # Laser spot position vector is junk in the .dat file # print('Laser position / unknown parameter varying') spat_dim += 1 # print('Total of {} spatial dimensions'.format(spat_dim)) self.__spat_dim__ = spat_dim
[docs] def read_pixel(self, bin_fft=None): """ Returns a BEpixel object containing the parsed information within a pixel. Moves pixel index up by one. This is where one could conceivably read the file in one pass instead of making 100,000 file I/Os. Returns ------- pixel : BEPSndfPixel Object that describes the data contained within the pixel """ if self.__filesize__ == self.__start_point__ * 4: print('BEPS NDF Parser - No more pixels left!') return -1 self.__file_handle__.seek(self.__start_point__ * 4, 0) spectrogram_length = int(np.fromstring(self.__file_handle__.read(4), dtype='f')[0]) # length of spectrogram self.__file_handle__.seek(self.__start_point__ * 4, 0) data_vec = np.fromstring(self.__file_handle__.read(spectrogram_length * 4), dtype='f') self.__start_point__ += spectrogram_length self.__curr_Pixel__ += 1 if self.__filesize__ == self.__start_point__ * 4: print('BEPS NDF Parser reached End of File') self.__EOF__ = True self.__file_handle__.close() return BEPSndfPixel(data_vec, abs(self.__wave_type__), bin_fft)
[docs] class BEPSndfPixel(object): """ Stands for BEPS (new data format) Pixel. This class parses (and keeps) the stream of data contained in a single cell of a BEPS data set of the new data format. Access desired parameter directly without get methods. """ def __init__(self, data_vec, harm=1, bin_fft=None): """ Initializes the pixel instance by parsing the provided data. Parameters ---------- data_vec : 1D float numpy array Data contained within each pixel harm: unsigned int Harmonic of the BE waveform. absolute value of the wave type used to normalize the response waveform. """ harm = abs(harm) if harm > 3 or harm < 1: harm = 1 warn('Error in BEPSndfPixel: invalid wave type / harmonic provided.') self.harm = harm # Begin parsing data: self.spatial_index = int(data_vec[1]) - 1 self.spectrogram_length = int(data_vec[0]) # calculate indices for parsing s1 = int(data_vec[2]) # total rows in pixel s2 = int(data_vec[3]) # total cols in pixel data_vec1 = data_vec[2:self.spectrogram_length] data_mat1 = data_vec1.reshape(s1, s2) spect_size1 = int(data_mat1[1, 0]) # total rows in spectrogram set self.num_bins = int(spect_size1 / 2) # or, len(BE_bin_w) self.num_steps = int(data_mat1[1, 1]) # total cols in spectrogram set s3 = int(s1 - spect_size1) # row index of beginning of spectrogram set s4 = int(s2 - self.num_steps) # col index of beginning of spectrogram set self.wave_label = data_mat1[2, 0] # This is useless self.wave_modulation_type = data_mat1[2, 1] # this is the one with useful information # print 'Pixel #',self.spatial_index,' Wave label: ',self.wave_label, ', Wave Type: ', self.wave_modulation_type # First get the information from the columns: # real part of excitation waveform fft_be_wave_real = data_mat1[s3:s3 - 0 + self.num_bins, 1] # imaginary part of excitation waveform fft_be_wave_imag = data_mat1[s3 + self.num_bins:s3 - 0 + spect_size1, 1] """ Though typecasting the combination of the real and imaginary data looks fine in HDFviewer and Spyder, Labview sees such data as an array of clusters having 'r' and 'i' elements """ # self.FFT_BE_wave = np.complex64(fft_be_wave_real + 1j*fft_be_wave_imag) # complex excitation waveform! due to a problem in the acquisition software, this may not be normalized properly self.FFT_BE_wave = np.zeros(self.num_bins, dtype=np.complex64) self.FFT_BE_wave.real = fft_be_wave_real self.FFT_BE_wave.imag = fft_be_wave_imag del fft_be_wave_real, fft_be_wave_imag self.BE_bin_w = data_mat1[s3:s3 - 0 + self.num_bins, 2] # vector of band frequencies # vector of band indices (out of all accessible frequencies below Nyquist frequency) self.BE_bin_ind = data_mat1[s3 + self.num_bins:s3 - 0 + spect_size1, 2] # Now look at the top few rows to get more information: self.daq_channel = data_mat1[2, 2] # self.num_x_steps = int(data_mat1[3, 0]) # self.num_y_steps = int(data_mat1[4, 0]) self.num_z_steps = int(data_mat1[5, 0]) self.z_index = int(data_mat1[5, 1] - 1) # self.y_index = int(data_mat1[4, 1] - 1) # self.x_index = int(data_mat1[3, 1] - 1) self.z_value = data_mat1[5, 2] # self.y_value = data_mat1[4, 2] # self.x_value = data_mat1[3, 2] self.num_x_steps = int(data_mat1[4, 0]) self.num_y_steps = int(data_mat1[3, 0]) self.x_index = int(data_mat1[4, 1] - 1) self.y_index = int(data_mat1[3, 1] - 1) self.x_value = data_mat1[4, 2] self.y_value = data_mat1[3, 2] self.step_ind_vec = data_mat1[0, s4:] # vector of step indices self.DC_off_vec = data_mat1[1, s4:] # vector of dc offsets voltages self.AC_amp_vec = data_mat1[2, s4:] # vector of ac amplitude voltages # matrix of noise floor data. Use this information to exclude bins during fitting self.noise_floor_mat = data_mat1[3:6, s4:] # self.plot_group_list_mat = data_mat1[6:s3-2, s4:] # matrix of plot groups # Here come the optional parameter rows: self.deflVolt_vec = data_mat1[s3 - 2, s4:] # vector of dc cantilever deflection # I think this is how the defl setpoint vec should be fixed: self.deflVolt_vec[np.isnan(self.deflVolt_vec)] = 0 # so far, this vector seemed to match the DC offset vector.... self.laser_spot_pos_vec = data_mat1[s3 - 1, s4:] # NEVER used # Actual data for this pixel: # real part of response spectrogram spectrogram_real_mat = data_mat1[s3:s3 + self.num_bins, s4:] # imaginary part of response spectrogram spectrogram_imag_mat = data_mat1[s3 + self.num_bins:s3 + spect_size1, s4:] # Be consistent and ensure that the data is also stored as 64 bit complex as in the array creation # complex part of response spectrogram self.spectrogram_mat = np.complex64(spectrogram_real_mat + 1j * spectrogram_imag_mat) del spectrogram_real_mat, spectrogram_imag_mat if bin_fft is not None: self.FFT_BE_wave = bin_fft self.spectrogram_mat = normalizeBEresponse(self.spectrogram_mat, self.FFT_BE_wave, harm) # Reshape as one column (its free in Python anyway): temp_mat = self.spectrogram_mat.transpose() self.spectrogram_vec = temp_mat.reshape(self.spectrogram_mat.size)
[docs] def is_different_from(self, prev_pixel): """ Compares parameters in this object with those another BEPSndfPixel object to tell if any parameter has changed between these pixels Parameters ---------- prev_pixel : BEPSndfPixel object The other pixel object to compare this pixel to Returns ------- is_different : Boolean Whether or not these pixel objects share the same parameters Notes ----- *Typical things that change during BEPS* 1. BE parameters: a. Center Frequency, Band Width - changes in the BE_bin_w b. Amplitude, Phase Variation, Band Edge Smoothing, Band Edge Trim - Harder to find out what happened exactly - FFT should show changes c. BE repeats, desired duration - changes in the spectrogram length? 2. VS Parameters a. Amplitude, Phase shift - Changes in the AC_amp_vec / DC offset b. Offset, Read voltage - Shows up in the DC offset c. Steps per full Cycle - Changes in DC offset / AC amplitude .... d. Number of cycles, Cycle fraction, In and out of field - changes in the length of DC offset etc. e. FORC - should all show up in DC / AC amplitude 3. IO parameters : don't change really 4. grid parameters : cannot do anything about this. """ disp_on = True if self.spectrogram_length != prev_pixel.spectrogram_length: if disp_on: print('Spectrogram Length changed on pixel {}'.format(self.spatial_index)) return True if self.num_bins != prev_pixel.num_bins: if disp_on: print('Number of bins changed on on pixel {}'.format(self.spatial_index)) return True if not np.array_equal(self.BE_bin_w, prev_pixel.BE_bin_w): if disp_on: print('Bin Frequencies changed on pixel {}'.format(self.spatial_index)) return True if not np.array_equal(self.FFT_BE_wave, prev_pixel.FFT_BE_wave): if disp_on: print('BE FFT changed on pixel {}'.format(self.spatial_index)) return True if not np.array_equal(self.AC_amp_vec, prev_pixel.AC_amp_vec): if disp_on: print('AC amplitude (UDVS) changed on pixel {}'.format(self.spatial_index)) return True if not np.array_equal(self.DC_off_vec, prev_pixel.DC_off_vec): if disp_on: print('DC offset (UDVS) changed on pixel {}'.format(self.spatial_index)) return True # I was told that this section was just garbage in the file. # if not np.array_equal(self.laser_spot_pos_vec, prev_pixel.laser_spot_pos_vec): # print 'Laser spot position vec was different....' # print self.laser_spot_pos_vec # return True if not np.array_equal(self.deflVolt_vec, prev_pixel.deflVolt_vec): print('deflVolt_vec vec was different....') return True return False