Source code for BGlib.be.translators.be_odf

# -*- coding: utf-8 -*-
"""
Created on Tue Nov  3 15:24:12 2015

@author: Suhas Somnath, Stephen Jesse
"""

from __future__ import division, print_function, absolute_import, unicode_literals

from os import path, listdir, remove
import sys
import datetime
from warnings import warn
import h5py
import numpy as np
from scipy.io.matlab import loadmat  # To load parameters stored in Matlab .mat file

from .df_utils.be_utils import trimUDVS, getSpectroscopicParmLabel, parmsToDict, generatePlotGroups, \
    createSpecVals, requires_conjugate, generate_bipolar_triangular_waveform, \
    infer_bipolar_triangular_fraction_phase, nf32

from sidpy.hdf.hdf_utils import write_simple_attrs, copy_attributes
from sidpy.hdf.reg_ref import write_region_references
from sidpy.sid import Translator
from pyUSID.processing.comp_utils import get_available_memory
from pyUSID.io.anc_build_utils import INDICES_DTYPE, VALUES_DTYPE, Dimension, calc_chunks
from pyUSID.io.hdf_utils import write_ind_val_dsets, write_main_dataset, \
    create_indexed_group, write_book_keeping_attrs,\
    write_reduced_anc_dsets, get_unit_values
from pyUSID.io.usi_data import USIDataset

if sys.version_info.major == 3:
    unicode = str


[docs] class BEodfTranslator(Translator): """ Translates either the Band Excitation (BE) scan or Band Excitation Polarization Switching (BEPS) data format from the old data format(s) to .h5 """ def __init__(self, *args, **kwargs): super(BEodfTranslator, self).__init__(*args, **kwargs) self.h5_raw = None self.num_rand_spectra = kwargs.pop('num_rand_spectra', 1000) self._cores = kwargs.pop('cores', None) self.FFT_BE_wave = None self.signal_type = None self.expt_type = None self._verbose = False self.max_ram = int(get_available_memory()*0.8 / (1024 ** 2))
[docs] @staticmethod def is_valid_file(data_path): """ Checks whether the provided file can be read by this translator Parameters ---------- data_path : str Path to raw data file Returns ------- obj : str Path to file that will be accepted by the translate() function if this translator is indeed capable of translating the provided file. Otherwise, None will be returned """ if not isinstance(data_path, (str, unicode)): raise TypeError('data_path must be a string') ndf = 'newdataformat' data_path = path.abspath(data_path) if path.isfile(data_path): ext = data_path.split('.')[-1] if ext.lower() not in ['jpg', 'png', 'jpeg', 'tiff', 'mat', 'txt', 'dat', 'xls', 'xlsx']: return None # we only care about the folder names at this point... data_path, _ = path.split(data_path) # Check if the data is in the new or old format: # Check one level up: _, dir_name = path.split(data_path) if dir_name == ndf: # Though this translator could also read the files but the NDF Translator is more robust... return None # Check one level down: if ndf in listdir(data_path): # Though this translator could also read the files but the NDF Translator is more robust... return None file_path = path.join(data_path, listdir(path=data_path)[0]) _, path_dict = BEodfTranslator._parse_file_path(file_path) if any([x.find('bigtime_0') > 0 and x.endswith('.dat') for x in path_dict.values()]): # This is a G-mode Line experiment: return None if any([x.find('bigtime_0') > 0 and x.endswith('.dat') for x in path_dict.values()]): # This is a G-mode Line experiment: return None parm_found = any([piece in path_dict.keys() for piece in ['parm_txt', 'old_mat_parms']]) real_found = any([piece in path_dict.keys() for piece in ['read_real', 'write_real']]) imag_found = any([piece in path_dict.keys() for piece in ['read_imag', 'write_imag']]) if parm_found and real_found and imag_found: if 'parm_txt' in path_dict.keys(): return path_dict['parm_txt'] else: return path_dict['old_mat_parms'] else: return None
[docs] def translate(self, file_path, show_plots=True, save_plots=True, do_histogram=False, verbose=False): """ Translates .dat data file(s) to a single .h5 file Parameters ------------- file_path : String / Unicode Absolute file path for one of the data files. It is assumed that this file is of the OLD data format. show_plots : (optional) Boolean Whether or not to show intermediate plots save_plots : (optional) Boolean Whether or not to save plots to disk do_histogram : (optional) Boolean Whether or not to construct histograms to visualize data quality. Note - this takes a fair amount of time verbose : (optional) Boolean Whether or not to print statements Returns ---------- h5_path : String / Unicode Absolute path of the resultant .h5 file """ self._verbose = verbose file_path = path.abspath(file_path) (folder_path, basename) = path.split(file_path) (basename, path_dict) = self._parse_file_path(file_path) h5_path = path.join(folder_path, basename + '.h5') tot_bins_multiplier = 1 udvs_denom = 2 if 'parm_txt' in path_dict.keys(): if self._verbose: print('\treading parameters from text file') isBEPS, parm_dict = parmsToDict(path_dict['parm_txt']) elif 'old_mat_parms' in path_dict.keys(): if self._verbose: print('\treading parameters from old mat file') parm_dict = self._get_parms_from_old_mat(path_dict['old_mat_parms'], verbose=self._verbose) if parm_dict['VS_steps_per_full_cycle'] == 0: isBEPS=False else: isBEPS=True else: raise FileNotFoundError('No parameters file found! Cannot ' 'translate this dataset!') # Initial text files named some parameters differently: for case in [('VS_mode', 'AC modulation mode', 'AC modulation mode with time reversal'), ('VS_mode', 'load Arbitrary VS Wave from text file', 'load user defined VS Wave from file'), ('BE_phase_content', 'chirp', 'chirp-sinc hybrid'),]: key, wrong_val, corr_val = case if key not in parm_dict.keys(): continue if parm_dict[key] == wrong_val: warn('Updating parameter "{}" from invalid value of "{}" to ' '"{}"'.format(key, wrong_val, corr_val)) parm_dict[key] = corr_val # Some .mat files did not set correct values to some parameters: for case in [('BE_amplitude_[V]', 1E-2, 0.5151), ('VS_amplitude_[V]', 1E-2, 0.9876)]: key, min_val, new_val = case if key not in parm_dict.keys(): continue if parm_dict[key] < min_val: warn('Updating parameter "{}" from invalid value of {} to {}' ''.format(key, parm_dict[key], new_val)) parm_dict[key] = new_val if self._verbose: keys = list(parm_dict.keys()) keys.sort() print('\tExperiment parameters:') for key in keys: print('\t\t{} : {}'.format(key, parm_dict[key])) print('\n\tisBEPS = {}'.format(isBEPS)) if parm_dict['BE_bins_per_band'] > 300: raise ValueError('Too many BE bins per band: {}. Translation would' ' not have failed but resultant data would be ' 'nonsensical' ''.format(parm_dict['BE_bins_per_band'])) ignored_plt_grps = [] if isBEPS: parm_dict['data_type'] = 'BEPSData' field_mode = parm_dict['VS_measure_in_field_loops'] std_expt = parm_dict['VS_mode'] != 'load user defined VS Wave from file' if not std_expt: raise ValueError('This translator does not handle user defined voltage spectroscopy') spec_label = getSpectroscopicParmLabel(parm_dict['VS_mode']) if parm_dict['VS_mode'] in ['DC modulation mode', 'current mode']: if field_mode == 'in and out-of-field': tot_bins_multiplier = 2 udvs_denom = 1 else: if field_mode == 'out-of-field': ignored_plt_grps = ['in-field'] else: ignored_plt_grps = ['out-of-field'] else: tot_bins_multiplier = 1 udvs_denom = 1 else: spec_label = 'None' parm_dict['data_type'] = 'BELineData' # Check file sizes: if self._verbose: print('\tChecking sizes of real and imaginary data files') if 'read_real' in path_dict.keys(): real_size = path.getsize(path_dict['read_real']) imag_size = path.getsize(path_dict['read_imag']) else: real_size = path.getsize(path_dict['write_real']) imag_size = path.getsize(path_dict['write_imag']) if real_size != imag_size: raise ValueError("Real and imaginary file sizes do not match!") if real_size == 0: raise ValueError('Real and imaginary files were empty') # Check here if a second channel for current is present # Look for the file containing the current data if self._verbose: print('\tLooking for secondary channels') file_names = listdir(folder_path) aux_files = [] current_data_exists = False for fname in file_names: if 'AI2' in fname: if 'write' in fname: current_file = path.join(folder_path, fname) current_data_exists=True aux_files.append(path.join(folder_path, fname)) add_pix = False num_rows = int(parm_dict['grid_num_rows']) num_cols = int(parm_dict['grid_num_cols']) if self._verbose: print('\tRows: {}, Cols: {}'.format(num_rows, num_cols)) num_pix = num_rows * num_cols tot_bins = real_size / (num_pix * 4) # Check for case where only a single pixel is missing. if num_pix == 1: check_bins = real_size / (num_pix * 4) else: check_bins = real_size / ((num_pix - 1) * 4) if self._verbose: print('\tChecking bins: Total: {}, actual: {}'.format(tot_bins, check_bins)) if tot_bins % 1 and check_bins % 1: raise ValueError('Aborting! Some parameter appears to have ' 'changed in-between') elif not tot_bins % 1: # Everything's ok pass elif not check_bins % 1: tot_bins = check_bins warn('Warning: A pixel seems to be missing from the data. ' 'File will be padded with zeros.') add_pix = True # This would crash and fail later if not fixed here # I don't like this hacky approach to solve this problem if isBEPS and tot_bins % 1 == 0 and parm_dict['VS_mode'] != 'Custom': bins_per_step = parm_dict['FORC_num_of_FORC_cycles'] * \ parm_dict['VS_number_of_cycles'] * \ parm_dict['VS_steps_per_full_cycle'] * \ parm_dict['BE_bins_per_band'] if verbose: print('\t\tNumber of bins per step: calculated: {}, actual {}' ''.format(bins_per_step, tot_bins)) if bins_per_step > 0: if bins_per_step < tot_bins and tot_bins / bins_per_step % 1 == 0: scale = int(tot_bins / bins_per_step) warn('Number of actual ({}) bins per step {}X larger than ' 'calculated ({}) values. Will scale VS cycles to get ' 'number of bins to match' ''.format(tot_bins, scale, bins_per_step)) parm_dict['VS_number_of_cycles'] *= scale else: if verbose: print('\t\tUnable to calculate number of bins per step ' 'since one or more parameters were 0') tot_bins = int(tot_bins) * tot_bins_multiplier if isBEPS: if self._verbose: print('\tBuilding UDVS table for BEPS') UDVS_labs, UDVS_units, UDVS_mat = self._build_udvs_table(parm_dict) if self._verbose: print('\tTrimming UDVS table to remove unused plot group columns') UDVS_mat, UDVS_labs, UDVS_units = trimUDVS(UDVS_mat, UDVS_labs, UDVS_units, ignored_plt_grps) old_spec_inds = np.zeros(shape=(2, tot_bins), dtype=INDICES_DTYPE) # Will assume that all excitation waveforms have same num of bins num_actual_udvs_steps = UDVS_mat.shape[0] / udvs_denom bins_per_step = tot_bins / num_actual_udvs_steps if self._verbose: print('\t# UDVS steps: {}, # bins/step: {}' ''.format(num_actual_udvs_steps, bins_per_step)) if bins_per_step % 1: print('UDVS mat shape: {}, total bins: {}, bins per step: {}'.format(UDVS_mat.shape, tot_bins, bins_per_step)) raise ValueError('Non integer number of bins per step!') bins_per_step = int(bins_per_step) num_actual_udvs_steps = int(num_actual_udvs_steps) if len(np.unique(UDVS_mat[:, 2])) == 0: raise ValueError('No non-zero rows in AC amplitude') stind = 0 for step_index in range(UDVS_mat.shape[0]): if UDVS_mat[step_index, 2] < 1E-3: # invalid AC amplitude continue # Bin step old_spec_inds[0, stind:stind + bins_per_step] = np.arange(bins_per_step, dtype=INDICES_DTYPE) # UDVS step old_spec_inds[1, stind:stind + bins_per_step] = step_index * np.ones(bins_per_step, dtype=INDICES_DTYPE) stind += bins_per_step del stind, step_index else: # BE Line if self._verbose: print('\tPreparing supporting variables since BE-Line') self.signal_type = 1 self.expt_type = 1 # Stephen has not used this index for some reason num_actual_udvs_steps = 1 bins_per_step = tot_bins UDVS_labs = ['step_num', 'dc_offset', 'ac_amp', 'wave_type', 'wave_mod', 'be-line'] UDVS_units = ['', 'V', 'A', '', '', ''] UDVS_mat = np.array([1, 0, parm_dict['BE_amplitude_[V]'], 1, 1, 1], dtype=np.float32).reshape(1, len(UDVS_labs)) old_spec_inds = np.vstack((np.arange(tot_bins, dtype=INDICES_DTYPE), np.zeros(tot_bins, dtype=INDICES_DTYPE))) if 'parm_mat' in path_dict.keys(): if self._verbose: print('\treading BE arrays from parameters text file') bin_inds, bin_freqs, bin_FFT, ex_wfm = self._read_parms_mat(path_dict['parm_mat'], isBEPS) elif 'old_mat_parms' in path_dict.keys(): if self._verbose: print('\treading BE arrays from old mat text file') bin_inds, bin_freqs, bin_FFT, ex_wfm, dc_amp_vec = self._read_old_mat_be_vecs(path_dict['old_mat_parms'], verbose=verbose) else: warn('No secondary parameters file (.mat) provided. Generating ' 'dummy BE arrays') band_width = parm_dict['BE_band_width_[Hz]'] * (0.5 - parm_dict['BE_band_edge_trim']) st_f = parm_dict['BE_center_frequency_[Hz]'] - band_width en_f = parm_dict['BE_center_frequency_[Hz]'] + band_width bin_freqs = np.linspace(st_f, en_f, bins_per_step, dtype=np.float32) if verbose: print('\tGenerating BE arrays of length: ' '{}'.format(bins_per_step)) bin_inds = np.zeros(shape=bins_per_step, dtype=np.int32) bin_FFT = np.zeros(shape=bins_per_step, dtype=np.complex64) ex_wfm = np.zeros(shape=bins_per_step, dtype=np.float32) # Forcing standardized datatypes: bin_inds = np.int32(bin_inds) bin_freqs = np.float32(bin_freqs) bin_FFT = np.complex64(bin_FFT) ex_wfm = np.float32(ex_wfm) self.FFT_BE_wave = bin_FFT # legacy parmeters inserted for BEAM parm_dict['num_bins'] = tot_bins parm_dict['num_pix'] = num_pix parm_dict['num_udvs_steps'] = num_actual_udvs_steps parm_dict['num_steps'] = num_actual_udvs_steps if self._verbose: print('\tPreparing UDVS slices for region references') udvs_slices = dict() for col_ind, col_name in enumerate(UDVS_labs): udvs_slices[col_name] = (slice(None), slice(col_ind, col_ind + 1)) # Need to add the Bin Waveform type - infer from UDVS exec_bin_vec = self.signal_type * np.ones(len(bin_inds), dtype=np.int32) if self.expt_type == 2: if self._verbose: print('\tExperiment type = 2. Doubling BE vectors') exec_bin_vec = np.hstack((exec_bin_vec, -1 * exec_bin_vec)) bin_inds = np.hstack((bin_inds, bin_inds)) bin_freqs = np.hstack((bin_freqs, bin_freqs)) # This is wrong but I don't know what else to do bin_FFT = np.hstack((bin_FFT, bin_FFT)) # Create Spectroscopic Values and Spectroscopic Values Labels datasets # This is an old and legacy way of doing things. Ideally, all we would need ot do is just get the unit values if self._verbose: print('\tCalculating spectroscopic values') ret_vals = createSpecVals(UDVS_mat, old_spec_inds, bin_freqs, exec_bin_vec, parm_dict, UDVS_labs, UDVS_units, verbose=verbose) spec_vals, spec_inds, spec_vals_labs, spec_vals_units, spec_vals_labs_names = ret_vals if self._verbose: print('\t\tspec_vals_labs: {}'.format(spec_vals_labs)) unit_vals = get_unit_values(spec_inds, spec_vals, all_dim_names=spec_vals_labs, is_spec=True, verbose=False) print('\tUnit spectroscopic values') for key, val in unit_vals.items(): print('\t\t{} : length: {}, values:\n\t\t\t{}'.format(key, len(val), val)) if spec_inds.shape[1] != tot_bins: raise ValueError('Second axis of spectroscopic indices: {} not ' 'matching with second axis of the expected main ' 'dataset: {}'.format(spec_inds.shape, tot_bins)) # Not sure what is happening here but this should work. spec_dim_dict = dict() for entry in spec_vals_labs_names: spec_dim_dict[entry[0] + '_parameters'] = entry[1] spec_vals_slices = dict() for row_ind, row_name in enumerate(spec_vals_labs): spec_vals_slices[row_name] = (slice(row_ind, row_ind + 1), slice(None)) if path.exists(h5_path): if self._verbose: print('\tRemoving existing / old translated file: ' + h5_path) remove(h5_path) # First create the file h5_f = h5py.File(h5_path, mode='w') # Then write root level attributes global_parms = dict() global_parms['grid_size_x'] = parm_dict['grid_num_cols'] global_parms['grid_size_y'] = parm_dict['grid_num_rows'] try: global_parms['experiment_date'] = parm_dict['File_date_and_time'] except KeyError: global_parms['experiment_date'] = '1:1:1' # assuming that the experiment was completed: global_parms['current_position_x'] = parm_dict['grid_num_cols'] - 1 global_parms['current_position_y'] = parm_dict['grid_num_rows'] - 1 global_parms['data_type'] = parm_dict['data_type'] global_parms['translator'] = 'ODF' if self._verbose: print('\tWriting attributes to HDF5 file root') write_simple_attrs(h5_f, global_parms) write_book_keeping_attrs(h5_f) # Then create the measurement group h5_meas_group = create_indexed_group(h5_f, 'Measurement') # Write attributes at the measurement group level if self._verbose: print('\twriting attributes to Measurement group') write_simple_attrs(h5_meas_group, parm_dict) # Create the Channel group h5_chan_grp = create_indexed_group(h5_meas_group, 'Channel') # Write channel group attributes write_simple_attrs(h5_chan_grp, {'Channel_Input': 'IO_Analog_Input_1', 'channel_type': 'BE'}) # Now the datasets! if self._verbose: print('\tCreating ancillary datasets') h5_chan_grp.create_dataset('Excitation_Waveform', data=ex_wfm) h5_udvs = h5_chan_grp.create_dataset('UDVS', data=UDVS_mat) # TODO: Avoid using region references in USID write_region_references(h5_udvs, udvs_slices, add_labels_attr=True, verbose=self._verbose) write_simple_attrs(h5_udvs, {'units': UDVS_units}, verbose=False) h5_chan_grp.create_dataset('UDVS_Indices', data=old_spec_inds[1]) h5_chan_grp.create_dataset('Bin_Step', data=np.arange(bins_per_step, dtype=INDICES_DTYPE), dtype=INDICES_DTYPE) h5_chan_grp.create_dataset('Bin_Indices', data=bin_inds, dtype=INDICES_DTYPE) h5_chan_grp.create_dataset('Bin_Frequencies', data=bin_freqs) h5_chan_grp.create_dataset('Bin_FFT', data=bin_FFT) h5_chan_grp.create_dataset('Bin_Wfm_Type', data=exec_bin_vec) if self._verbose: print('\tWriting Position datasets') pos_dims = [Dimension('X', 'm', np.arange(num_cols)), Dimension('Y', 'm', np.arange(num_rows))] h5_pos_ind, h5_pos_val = write_ind_val_dsets(h5_chan_grp, pos_dims, is_spectral=False, verbose=self._verbose) if self._verbose: print('\tPosition datasets of shape: {}'.format(h5_pos_ind.shape)) if self._verbose: print('\tWriting Spectroscopic datasets of shape: {}'.format(spec_inds.shape)) h5_spec_inds = h5_chan_grp.create_dataset('Spectroscopic_Indices', data=spec_inds, dtype=INDICES_DTYPE) h5_spec_vals = h5_chan_grp.create_dataset('Spectroscopic_Values', data=np.array(spec_vals), dtype=VALUES_DTYPE) for dset in [h5_spec_inds, h5_spec_vals]: # TODO: This is completely unnecessary write_region_references(dset, spec_vals_slices, add_labels_attr=True, verbose=self._verbose) write_simple_attrs(dset, {'units': spec_vals_units}, verbose=False) write_simple_attrs(dset, spec_dim_dict) # Noise floor should be of shape: (udvs_steps x 3 x positions) if self._verbose: print('\tWriting noise floor dataset') h5_chan_grp.create_dataset('Noise_Floor', (num_pix, num_actual_udvs_steps), dtype=nf32) """ New Method for chunking the Main_Data dataset. Chunking is now done in N-by-N squares of UDVS steps by pixels. N is determined dynamically based on the dimensions of the dataset. Currently it is set such that individual chunks are less than 10kB in size. Chris Smith -- csmith55@utk.edu """ BEPS_chunks = calc_chunks([num_pix, tot_bins], np.complex64(0).itemsize, unit_chunks=(1, bins_per_step)) if self._verbose: print('\tHDF5 dataset will have chunks of size: {}'.format(BEPS_chunks)) print('\tCreating empty main dataset of shape: ({}, {})'.format(num_pix, tot_bins)) self.h5_raw = write_main_dataset(h5_chan_grp, (num_pix, tot_bins), 'Raw_Data', 'Piezoresponse', 'V', None, None, dtype=np.complex64, h5_pos_inds=h5_pos_ind, h5_pos_vals=h5_pos_val, h5_spec_inds=h5_spec_inds, h5_spec_vals=h5_spec_vals, verbose=self._verbose) if self._verbose: print('\tReading data from binary data files into raw HDF5') self._read_data(parm_dict, path_dict, real_size, isBEPS, add_pix) if self._verbose: print('\tGenerating plot groups') generatePlotGroups(self.h5_raw, self.mean_resp, folder_path, basename, self.max_resp, self.min_resp, max_mem_mb=self.max_ram, spec_label=spec_label, show_plots=show_plots, save_plots=save_plots, do_histogram=do_histogram, debug=self._verbose) if self._verbose: print('\tUpgrading to USIDataset') self.h5_raw = USIDataset(self.h5_raw) # Go ahead and read the current data in the second (current) channel if current_data_exists: #If a .dat file matches if self._verbose: print('\tReading data in secondary channels (current)') self._read_secondary_channel(h5_meas_group, aux_files) if self._verbose: print('\tClosing HDF5 file') h5_f.close() return h5_path
def _read_data(self, parm_dict, path_dict, real_size, isBEPS, add_pix): """ Checks if the data is BEPS or BELine and calls the correct function to read the data from file Parameters ---------- parm_dict : dict Experimental parameters path_dict : dict Dictionary of data files to be read real_size : dict Size of each data file isBEPS : boolean Is the data BEPS add_pix : boolean Does the reader need to add extra pixels to the end of the dataset Returns ------- None """ # Now read the raw data files: if not isBEPS: # Do this for all BE-Line (always small enough to read in one shot) if self._verbose: print('\t\tReading all raw data for BE-Line in one shot') self._quick_read_data(path_dict['read_real'], path_dict['read_imag'], parm_dict['num_udvs_steps']) elif real_size < self.max_ram and \ parm_dict['VS_measure_in_field_loops'] == 'out-of-field': # Do this for out-of-field BEPS ONLY that is also small (256 MB) if self._verbose: print('\t\tReading all raw BEPS (out-of-field) data at once') self._quick_read_data(path_dict['read_real'], path_dict['read_imag'], parm_dict['num_udvs_steps']) elif real_size < self.max_ram and \ parm_dict['VS_measure_in_field_loops'] == 'in-field': # Do this for in-field only if self._verbose: print('\t\tReading all raw BEPS (in-field only) data at once') self._quick_read_data(path_dict['write_real'], path_dict['write_imag'], parm_dict['num_udvs_steps']) else: # Large BEPS datasets OR those with in-and-out of field if self._verbose: print('\t\tReading all raw data for in-and-out-of-field OR ' 'very large file one pixel at a time') self._read_beps_data(path_dict, parm_dict['num_udvs_steps'], parm_dict['VS_measure_in_field_loops'], add_pix) self.h5_raw.file.flush() def _read_beps_data(self, path_dict, udvs_steps, mode, add_pixel=False): """ Reads the imaginary and real data files pixelwise and writes to the H5 file Parameters -------------------- path_dict : dictionary Dictionary containing the absolute paths of the real and imaginary data files udvs_steps : unsigned int Number of UDVS steps mode : String / Unicode 'in-field', 'out-of-field', or 'in and out-of-field' add_pixel : boolean. (Optional; default is False) If an empty pixel worth of data should be written to the end Returns -------------------- None """ print('---- reading pixel-by-pixel ----------') bytes_per_pix = self.h5_raw.shape[1] * 4 step_size = self.h5_raw.shape[1] / udvs_steps if mode == 'out-of-field': parsers = [BEodfParser(path_dict['read_real'], path_dict['read_imag'], self.h5_raw.shape[0], bytes_per_pix)] elif mode == 'in-field': parsers = [BEodfParser(path_dict['write_real'], path_dict['write_imag'], self.h5_raw.shape[0], bytes_per_pix)] elif mode == 'in and out-of-field': # each file will only have half the udvs steps: if 0.5 * udvs_steps % 1: raise ValueError('Odd number of UDVS') udvs_steps = int(0.5 * udvs_steps) # be careful - each pair contains only half the necessary bins - so read half parsers = [BEodfParser(path_dict['write_real'], path_dict['write_imag'], self.h5_raw.shape[0], int(bytes_per_pix / 2)), BEodfParser(path_dict['read_real'], path_dict['read_imag'], self.h5_raw.shape[0], int(bytes_per_pix / 2))] if step_size % 1: raise ValueError('strange number of bins per UDVS step. Exiting') step_size = int(step_size) rand_spectra = self._get_random_spectra(parsers, self.h5_raw.shape[0], udvs_steps, step_size, num_spectra=self.num_rand_spectra) take_conjugate = requires_conjugate(rand_spectra, cores=self._cores) self.mean_resp = np.zeros(shape=(self.h5_raw.shape[1]), dtype=np.complex64) self.max_resp = np.zeros(shape=(self.h5_raw.shape[0]), dtype=np.float32) self.min_resp = np.zeros(shape=(self.h5_raw.shape[0]), dtype=np.float32) numpix = self.h5_raw.shape[0] """ Don't try to do the last step if a pixel is missing. This will be handled after the loop. """ if add_pixel: numpix -= 1 for pix_indx in range(numpix): if self.h5_raw.shape[0] > 5: if pix_indx % int(round(self.h5_raw.shape[0] / 10)) == 0: print('Reading... {} complete'.format(round(100 * pix_indx / self.h5_raw.shape[0]))) # get the raw stream from each parser pxl_data = list() for prsr in parsers: pxl_data.append(prsr.read_pixel()) # interleave if both in and out of field # we are ignoring user defined possibilities... if mode == 'in and out-of-field': in_fld = pxl_data[0] out_fld = pxl_data[1] in_fld_2 = in_fld.reshape(udvs_steps, step_size) out_fld_2 = out_fld.reshape(udvs_steps, step_size) raw_mat = np.empty((udvs_steps * 2, step_size), dtype=out_fld.dtype) raw_mat[0::2, :] = in_fld_2 raw_mat[1::2, :] = out_fld_2 raw_vec = raw_mat.reshape(in_fld.size + out_fld.size).transpose() else: raw_vec = pxl_data[0] # only one parser self.max_resp[pix_indx] = np.max(np.abs(raw_vec)) self.min_resp[pix_indx] = np.min(np.abs(raw_vec)) self.mean_resp = (1 / (pix_indx + 1)) * (raw_vec + pix_indx * self.mean_resp) if take_conjugate: raw_vec = np.conjugate(raw_vec) self.h5_raw[pix_indx, :] = np.complex64(raw_vec[:]) self.h5_raw.file.flush() # Add zeros to main_data for the missing pixel. if add_pixel: self.h5_raw[-1, :] = 0 + 0j print('---- Finished reading files -----') def _quick_read_data(self, real_path, imag_path, udvs_steps): """ Returns information about the excitation BE waveform present in the .mat file Parameters ----------- real_path : String / Unicode Absolute file path of the real data file imag_path : String / Unicode Absolute file path of the real data file udvs_steps : unsigned int Number of UDVS steps """ parser = BEodfParser(real_path, imag_path, self.h5_raw.shape[0], self.h5_raw.shape[1] * 4) step_size = self.h5_raw.shape[1] / udvs_steps rand_spectra = self._get_random_spectra([parser], self.h5_raw.shape[0], udvs_steps, step_size, num_spectra=self.num_rand_spectra, verbose=self._verbose) if self._verbose: print('\t\t\tChecking if conjugate is required') take_conjugate = requires_conjugate(rand_spectra, cores=self._cores) raw_vec = parser.read_all_data() if take_conjugate: if self._verbose: print('\t'*4 + 'Taking conjugate for positive quality factors') raw_vec = np.conjugate(raw_vec) if raw_vec.shape != np.prod(self.h5_raw.shape): percentage_padded = 100 * (np.prod(self.h5_raw.shape) - raw_vec.shape) / np.prod(self.h5_raw.shape) warn('Warning! Raw data length {} is not matching placeholder length {}. ' 'Padding zeros for {}% of the data!'.format(raw_vec.shape, np.prod(self.h5_raw.shape), percentage_padded)) padded_raw_vec = np.zeros(np.prod(self.h5_raw.shape), dtype = np.complex64) padded_raw_vec[:raw_vec.shape[0]] = raw_vec raw_mat = padded_raw_vec.reshape(self.h5_raw.shape[0], self.h5_raw.shape[1]) else: raw_mat = raw_vec.reshape(self.h5_raw.shape[0], self.h5_raw.shape[1]) # Write to the h5 dataset: self.mean_resp = np.mean(raw_mat, axis=0) self.max_resp = np.amax(np.abs(raw_mat), axis=0) self.min_resp = np.amin(np.abs(raw_mat), axis=0) self.h5_raw[:, :] = np.complex64(raw_mat) self.h5_raw.file.flush() print('---- Finished reading files -----') @staticmethod def _parse_file_path(data_filepath): """ Returns the basename and a dictionary containing the absolute file paths for the real and imaginary data files, text and mat parameter files in a dictionary Parameters -------------------- data_filepath: String / Unicode Absolute path of any file in the same directory as the .dat files Returns -------------------- basename : String / Unicode Basename of the dataset path_dict : Dictionary Dictionary containing absolute paths of all necessary data and parameter files """ (folder_path, basename) = path.split(data_filepath) (super_folder, basename) = path.split(folder_path) if basename.endswith('_d') or basename.endswith('_c'): # Old old data format where the folder ended with a _d or _c to denote a completed spectroscopic run basename = basename[:-2] """ A single pair of real and imaginary files are / were generated for: BE-Line and BEPS (compiled version only generated out-of-field or 'read') Two pairs of real and imaginary files were generated for later BEPS datasets These have 'read' and 'write' prefixes to denote out or in field respectively """ path_dict = dict() for file_name in listdir(folder_path): abs_path = path.join(folder_path, file_name) if file_name.endswith('.txt') and file_name.find('parm') > 0: path_dict['parm_txt'] = abs_path elif file_name.find('.mat') > 0: if file_name.find('more_parms') > 0: path_dict['parm_mat'] = abs_path elif file_name == (basename + '.mat'): path_dict['old_mat_parms'] = abs_path elif file_name.endswith('.dat'): # Need to account for the second AI channel here file_tag = 'read' if file_name.find('write') > 0: file_tag = 'write' if file_name.find('real') > 0: file_tag += '_real' elif file_name.find('imag') > 0: file_tag += '_imag' path_dict[file_tag] = abs_path return basename, path_dict def _read_secondary_channel(self, h5_meas_group, aux_file_path): """ Reads secondary channel stored in AI .mat file Currently works for in-field measurements only, but should be updated to include both in and out of field measurements Parameters ----------- h5_meas_group : h5 group Reference to the Measurement group aux_file_path : String / Unicode Absolute file path of the secondary channel file. """ if self._verbose: print('\t---------- Reading Secondary Channel ----------') if isinstance(aux_file_path, (list, tuple)): aux_file_paths = aux_file_path else: aux_file_paths = list(aux_file_path) is_in_out_field = 'Field' in self.h5_raw.spec_dim_labels if not is_in_out_field and len(aux_file_paths) > 1: # TODO: Find a better way to handle this warn('\t\tField was not varied but found more than one file for ' 'secondary channel: {}.\n\t\tResults will be overwritten' ''.format([path.split(item)[-1] for item in aux_file_paths])) elif is_in_out_field and len(aux_file_paths) == 1: warn('\t\tField was varied but only one data file for secondary' 'channel was found. Half the data will be zeros') spectral_len = 1 for dim_name, dim_size in zip(self.h5_raw.spec_dim_labels, self.h5_raw.spec_dim_sizes): if dim_name == 'Frequency': continue spectral_len = spectral_len * dim_size num_pix = self.h5_raw.shape[0] if self._verbose: print('\t\tExpecting this channel to be of shape: ({}, {})' ''.format(num_pix, spectral_len)) print('\t\tis_in_out_field: {}'.format(is_in_out_field)) # create a new channel h5_current_channel_group = create_indexed_group(h5_meas_group, 'Channel') # Copy attributes from the main channel copy_attributes(self.h5_raw.parent, h5_current_channel_group) # Modify attributes that are different write_simple_attrs(h5_current_channel_group, {'Channel_Input': 'IO_Analog_Input_2', 'channel_type': 'Current'}, verbose=False) # Get the reduced dimensions ret_vals = write_reduced_anc_dsets(h5_current_channel_group, self.h5_raw.h5_spec_inds, self.h5_raw.h5_spec_vals, 'Frequency', is_spec=True) h5_current_spec_inds, h5_current_spec_values = ret_vals if self._verbose: print('\t\tCreated groups, wrote attributes and spec datasets') h5_current_main = write_main_dataset(h5_current_channel_group, # parent HDF5 group (num_pix, spectral_len), # shape of Main dataset 'Raw_Data', # Name of main dataset 'Current', # Physical quantity contained in Main dataset 'nA', # Units for the physical quantity None, # Position dimensions None, # Spectroscopic dimensions h5_pos_inds=self.h5_raw.h5_pos_inds, h5_pos_vals=self.h5_raw.h5_pos_vals, h5_spec_inds=h5_current_spec_inds, h5_spec_vals=h5_current_spec_values, dtype=np.float32, # data type / precision main_dset_attrs={'IO_rate': 4E+6, 'Amplifier_Gain': 9}, verbose=self._verbose) if self._verbose: print('\t\tCreated empty main dataset:\n{}' ''.format(h5_current_main)) if is_in_out_field: if self._verbose: print('\t\tHalving the spectral length per binary file to: {} ' 'since this measurement has in and out of field' ''.format(spectral_len // 2)) spectral_len = spectral_len // 2 # calculate the # positions that can be stored in memory in one go. b_per_position = np.float32(0).itemsize * spectral_len max_pos_per_read = int(np.floor((get_available_memory()) / b_per_position)) if self._verbose: print('\t\tAllowed to read {} pixels per chunk' ''.format(max_pos_per_read)) print('\t\tStarting to read raw binary data') # Open the read and write files and write them to the hdf5 file for aux_file in aux_file_paths: if 'write' in aux_file: infield = True else: infield = False if self._verbose: print('\t' * 3 + 'Reading file: {}'.format(aux_file)) cur_file = open(aux_file, "rb") start_pix = 0 while start_pix < num_pix: cur_file.seek(start_pix * b_per_position, 0) end_pix = min(num_pix, start_pix + max_pos_per_read) pos_to_read = end_pix - start_pix bytes_to_read = pos_to_read * b_per_position if self._verbose: print('\t' * 4 + 'Reading pixels {} to {} - {} bytes' ''.format(start_pix, end_pix, bytes_to_read)) cur_data = np.frombuffer(cur_file.read(bytes_to_read), dtype='f') if self._verbose: print('\t' * 4 + 'Read vector of shape: {}' ''.format(cur_data.shape)) print('\t' * 4 + 'Reshaping to ({}, {})' ''.format(pos_to_read, spectral_len)) data_2d = cur_data.reshape(pos_to_read, spectral_len) # Write to h5 if is_in_out_field: if infield: h5_current_main[start_pix:end_pix, ::2] = data_2d else: h5_current_main[start_pix:end_pix, 1::2] = data_2d else: h5_current_main[start_pix:end_pix, :] = data_2d # Flush to make sure that data is committed to HDF5 h5_current_main.file.flush() start_pix = end_pix if self._verbose: print('\t' * 4 + 'Done reading binary file') cur_file.close() @staticmethod def _read_old_mat_be_vecs(file_path, verbose=False): """ Returns information about the excitation BE waveform present in the more parms.mat file Parameters -------------------- filepath : String or unicode Absolute filepath of the .mat parameter file Returns -------------------- bin_inds : 1D numpy unsigned int array Indices of the excited and measured frequency bins bin_w : 1D numpy float array Excitation bin Frequencies bin_FFT : 1D numpy complex array FFT of the BE waveform for the excited bins BE_wave : 1D numpy float array Band Excitation waveform dc_amp_vec_full : 1D numpy float array spectroscopic waveform. This information will be necessary for fixing the UDVS for AC modulation for example """ matread = loadmat(file_path, squeeze_me=True) #TODO: What about key errors? BE_wave = matread['BE_wave'] bin_inds = matread['bin_ind'] - 1 # Python base 0 bin_w = matread['bin_w'] dc_amp_vec_full = matread['dc_amp_vec_full'] if verbose: for vec, var_name in zip([BE_wave, bin_inds, bin_w, dc_amp_vec_full], ['BE_wave', 'bin_inds', 'bin_w', 'dc_amp_vec_full']): print('\t\t{} has shape: {} and dtype: {}'.format(var_name, vec.shape, vec.dtype)) try: FFT_full = np.fft.fftshift(np.fft.fft(BE_wave)) except ValueError: FFT_full = BE_wave try: bin_FFT = np.conjugate(FFT_full[bin_inds]) except IndexError: bin_FFT = FFT_full return bin_inds, bin_w, bin_FFT, BE_wave, dc_amp_vec_full @staticmethod def _get_parms_from_old_mat(file_path, verbose=False): """ Formats parameters found in the old parameters .mat file into a dictionary as though the dataset had a parms.txt describing it Parameters -------------------- file_path : Unicode / String absolute filepath of the .mat file containing the parameters verbose : bool, optional, default = False Whether or not to print statemetns for debugging purposes Returns -------------------- parm_dict : dictionary Parameters describing experiment """ parm_dict = dict() matread = loadmat(file_path, squeeze_me=True) if verbose: print('\t\tEstimating File params from path: {}'.format(file_path)) parent, _ = path.split(file_path) parent, expt_name = path.split(parent) if expt_name.endswith('_c'): expt_name = expt_name[:-2] ind = expt_name.rfind('_0') suffix = 0 if ind > 0: try: suffix = int(expt_name[ind + 1:]) except ValueError: # print('Could not convert "' + suffix + '" to integer') pass expt_name = expt_name[:ind] parm_dict['File_file_path'] = parent parm_dict['File_file_name'] = expt_name parm_dict['File_file_suffix'] = suffix header = matread['__header__'].decode("utf-8") if verbose: print('\t\tEstimating experiment date and time from .mat file ' 'header: {} '.format(header)) targ_str = 'Created on: ' try: ind = header.index(targ_str) header = header[ind + len(targ_str):] # header = 'Wed Jan 04 13:11:21 2012' dt_obj = datetime.datetime.strptime(header, '%c') # '%a %b %d %H:%M:%S %Y') # parms.txt contains string formatted as: 09-Apr-2015 HH:MM:SS parm_dict['File_date_and_time'] = dt_obj.strftime( '%d-%b-%Y %H:%M:%S') except ValueError: pass parm_dict['IO_rate'] = str(int(matread['AO_rate'] / 1E+6)) + ' MHz' position_vec = matread['position_vec'] parm_dict['grid_current_row'] = position_vec[0] parm_dict['grid_current_col'] = position_vec[1] parm_dict['grid_num_rows'] = position_vec[2] parm_dict['grid_num_cols'] = position_vec[3] if position_vec[0] != position_vec[1] or position_vec[2] != \ position_vec[3]: warn('WARNING: Incomplete dataset. Translation not guaranteed!') parm_dict['grid_num_rows'] = position_vec[ 0] # set to number of present cols and rows parm_dict['grid_num_cols'] = position_vec[1] BE_parm_vec_1 = matread['BE_parm_vec_1'] try: BE_parm_vec_2 = matread['BE_parm_vec_2'] except KeyError: BE_parm_vec_2 = None if verbose: print('\t\tBE_parm_vec_1: {}'.format(BE_parm_vec_1)) print('\t\tBE_parm_vec_2: {}'.format(BE_parm_vec_2)) # Not required for translation but necessary to have if BE_parm_vec_1[0] == 3 or BE_parm_vec_2[0] == 3: parm_dict['BE_phase_content'] = 'chirp-sinc hybrid' else: parm_dict['BE_phase_content'] = 'Unknown' parm_dict['BE_center_frequency_[Hz]'] = BE_parm_vec_1[1] parm_dict['BE_band_width_[Hz]'] = BE_parm_vec_1[2] parm_dict['BE_amplitude_[V]'] = BE_parm_vec_1[3] parm_dict['BE_band_edge_smoothing_[s]'] = BE_parm_vec_1[ 4] # 150 most likely parm_dict['BE_phase_variation'] = BE_parm_vec_1[5] # 0.01 most likely parm_dict['BE_window_adjustment'] = BE_parm_vec_1[6] parm_dict['BE_points_per_step'] = 2 ** int(BE_parm_vec_1[7]) parm_dict['BE_repeats'] = 2 ** int(BE_parm_vec_1[8]) try: parm_dict['BE_bins_per_band'] = matread['bins_per_band_s'] except KeyError: parm_dict['BE_bins_per_band'] = len(matread['bin_w']) assembly_parm_vec = matread['assembly_parm_vec'] if verbose: print('\t\tassembly_parm_vec: {}'.format(assembly_parm_vec)) parm_dict['IO_Analog_Input_1'] = '+/- 10V, FFT' if assembly_parm_vec[3] == 0: parm_dict['IO_Analog_Input_2'] = 'off' else: parm_dict['IO_Analog_Input_2'] = '+/- 10V, FFT' # num_driving_bands = assembly_parm_vec[0] # 0 = 1, 1 = 2 bands # band_combination_order = assembly_parm_vec[1] # 0 parallel 1 series if 'SS_parm_vec' not in matread.keys(): if verbose: print('\t\tBE-Line dataset') return parm_dict if assembly_parm_vec[2] == 0: parm_dict['VS_measure_in_field_loops'] = 'out-of-field' elif assembly_parm_vec[2] == 1: parm_dict['VS_measure_in_field_loops'] = 'in and out-of-field' else: parm_dict['VS_measure_in_field_loops'] = 'in-field' VS_parms = matread['SS_parm_vec'] dc_amp_vec_full = matread['dc_amp_vec_full'] if verbose: print('\t\tVS_parms: {}'.format(VS_parms)) print('\t\tdc_amp_vec_full: {}'.format(dc_amp_vec_full)) VS_start_V = VS_parms[4] VS_start_loop_amp = VS_parms[5] VS_final_loop_amp = VS_parms[6] # VS_read_write_ratio = VS_parms[8] # 1 <- SS_read_write_ratio parm_dict['VS_set_pulse_amplitude_[V]'] = VS_parms[9] # 0 <- SS_set_pulse_amp parm_dict['VS_read_voltage_[V]'] = VS_parms[3] parm_dict['VS_steps_per_full_cycle'] = int(VS_parms[7]) # These two will be assigned after the initial round of parsing parm_dict['VS_cycle_fraction'] = 'full' parm_dict['VS_cycle_phase_shift'] = 0 parm_dict['VS_number_of_cycles'] = int(VS_parms[2]) parm_dict['FORC_num_of_FORC_cycles'] = 1 parm_dict['FORC_V_high1_[V]'] = 1 parm_dict['FORC_V_high2_[V]'] = 10 parm_dict['FORC_V_low1_[V]'] = -1 parm_dict['FORC_V_low2_[V]'] = -10 if VS_parms[0] in [0, 8, 9, 11]: if verbose: print('\t\tDC modulation or current mode based on VS_parms[0]:' ' {}'.format(VS_parms[0])) parm_dict['VS_mode'] = 'DC modulation mode' if VS_parms[0] == 9: if verbose: print('\t\tcurrent mode based on VS parms[0]') parm_dict['VS_mode'] = 'current mode' parm_dict['VS_amplitude_[V]'] = 0.5 * (max(dc_amp_vec_full) - np.min(dc_amp_vec_full)) # SS_max_offset_amplitude parm_dict['VS_offset_[V]'] = np.max(dc_amp_vec_full) + np.min( dc_amp_vec_full) elif VS_parms[0] in [1, 6, 7]: if verbose: print('\t\tFORC, based on VS_parms[0]: {}'.format(VS_parms[0])) # Could not tell difference between mode = 1 or 6 # mode 7 = multiple FORC cycles parm_dict['VS_mode'] = 'DC modulation mode' parm_dict['VS_amplitude_[V]'] = 1 # VS_parms[1] # SS_max_offset_amplitude parm_dict['VS_offset_[V]'] = 0 if VS_parms[0] == 7: if verbose: print('\t\t\tFORC with 2 cycles') parm_dict['VS_number_of_cycles'] = 2 parm_dict['FORC_num_of_FORC_cycles'] = VS_parms[2] // 2 else: if verbose: print('\t\t\tFORC with 1 cycle') parm_dict['VS_number_of_cycles'] = 1 parm_dict['FORC_num_of_FORC_cycles'] = VS_parms[2] if True: if verbose: print('\t\t\tMeasuring FORC high and low vals from DC vec') # Grabbing hi lo values directly from the vec left = dc_amp_vec_full[:parm_dict['VS_steps_per_full_cycle']] right = dc_amp_vec_full[ -1 * parm_dict['VS_steps_per_full_cycle']:] parm_dict['FORC_V_high1_[V]'] = np.max(left) parm_dict['FORC_V_high2_[V]'] = np.max(right) parm_dict['FORC_V_low1_[V]'] = np.min(left) parm_dict['FORC_V_low2_[V]'] = np.min(right) else: if verbose: print('\t\t\tGrabbing FORC high and low vals from parms') # Not getting correct values with prescribed method parm_dict['FORC_V_high1_[V]'] = VS_start_V parm_dict['FORC_V_high2_[V]'] = VS_start_V parm_dict['FORC_V_low1_[V]'] = VS_start_V - VS_start_loop_amp parm_dict['FORC_V_low2_[V]'] = VS_start_V - VS_final_loop_amp elif VS_parms[0] in [2, 3, 4]: if verbose: print('\t\tAC Spectroscopy with time reversal, based on ' 'VS_parms[0]: {}'.format(VS_parms[0])) if VS_parms[0] == 3: # These numbers seemed to match with the v_dc vector parm_dict['VS_number_of_cycles'] = int(VS_parms[2]) * 2 parm_dict['VS_steps_per_full_cycle'] = int(VS_parms[7] // 2) if VS_parms[0] == 4: # cycles are not tracked: slopes = np.diff(dc_amp_vec_full) num_cycles = len(np.where(slopes < 0)[0]) + 1 parm_dict['VS_number_of_cycles'] = num_cycles parm_dict['VS_steps_per_full_cycle'] = int(VS_parms[7] // num_cycles) parm_dict['VS_mode'] = 'AC modulation mode with time reversal' parm_dict['VS_amplitude_[V]'] = 0.5 * VS_final_loop_amp parm_dict['VS_offset_[V]'] = 0 # this is not correct. Fix manually when it comes to UDVS generation? else: # Did not see any examples of this... warn('Unknown VS mode: {}. Assuming user defined custom voltage ' 'spectroscopy'.format(VS_parms[0])) parm_dict['VS_mode'] = 'Custom' # Assigning the phase and fraction for bi-polar triangular waveforms if VS_parms[0] not in [2, 3, 4]: if verbose: print('\t\tEstimating phase and fraction based on slopes of first cycle') slopes = [] for ind in range(4): subsection = dc_amp_vec_full[ind * parm_dict['VS_steps_per_full_cycle'] // 4: (ind + 1) * parm_dict['VS_steps_per_full_cycle'] // 4] slopes.append(np.mean(np.diff(subsection))) if verbose: print('\t\t\tslopes for quarters: {}'.format(slopes)) frac, phas = infer_bipolar_triangular_fraction_phase(slopes) if verbose: print('\t\t\tCycle fraction: {}, Phase: {}'.format(frac, phas)) for str_val, num_val in zip(['full', '1/2', '1/4', '3/4'], [1., 0.5, 0.25, 0.75]): if frac == num_val: parm_dict['VS_cycle_fraction'] = str_val break for str_val, num_val in zip(['1/4', '1/2', '3/4'], [0.25, 0.5, 0.75]): if phas == num_val: parm_dict['VS_cycle_phase_shift'] = str_val break if verbose: print('\t\t\tCycle fraction: {}, Phase: {}' ''.format(parm_dict['VS_cycle_fraction'], parm_dict['VS_cycle_phase_shift'])) return parm_dict @staticmethod def _read_parms_mat(file_path, is_beps): """ Returns information about the excitation BE waveform present in the more parms.mat file Parameters -------------------- file_path : String / Unicode Absolute filepath of the .mat parameter file is_beps : Boolean Whether or not this is BEPS or BE-Line Returns -------------------- BE_bin_ind : 1D numpy unsigned int array Indices of the excited and measured frequency bins BE_bin_w : 1D numpy float array Excitation bin Frequencies BE_bin_FFT : 1D numpy complex array FFT of the BE waveform for the excited bins ex_wfm : 1D numpy float array Band Excitation waveform """ if not path.exists(file_path): raise IOError('NO "More parms" file found') if is_beps: fft_name = 'FFT_BE_wave' else: fft_name = 'FFT_BE_rev_wave' matread = loadmat(file_path, variable_names=['BE_bin_ind', 'BE_bin_w', fft_name]) BE_bin_ind = np.squeeze(matread['BE_bin_ind']) - 1 # From Matlab (base 1) to Python (base 0) BE_bin_w = np.squeeze(matread['BE_bin_w']) FFT_full = np.complex64(np.squeeze(matread[fft_name])) # For whatever weird reason, the sign of the imaginary portion is flipped. Correct it: # BE_bin_FFT = np.conjugate(FFT_full[BE_bin_ind]) BE_bin_FFT = np.zeros(len(BE_bin_ind), dtype=np.complex64) BE_bin_FFT.real = np.real(FFT_full[BE_bin_ind]) BE_bin_FFT.imag = -1 * np.imag(FFT_full[BE_bin_ind]) ex_wfm = np.real(np.fft.ifft(np.fft.ifftshift(FFT_full))) return BE_bin_ind, BE_bin_w, BE_bin_FFT, ex_wfm def _build_udvs_table(self, parm_dict): """ Generates the UDVS table using the parameters Parameters -------------------- parm_dict : dictionary Parameters describing experiment Returns -------------------- UD_VS_table_label : List of strings Labels for columns in the UDVS table UD_VS_table_unit : List of strings Units for the columns in the UDVS table UD_VS_table : 2D numpy float array UDVS data table """ def translate_val(target, strvals, numvals): """ Internal function - Interprets the provided value using the provided lookup table Parameters ---------- target : String Item we are looking for in the strvals list strvals : list of strings List of source values numvals : list of numbers List of results """ if len(strvals) != len(numvals): return None for strval, fltval in zip(strvals, numvals): if target == strval: return fltval return None # not found in list # % Extract values from parm text file BE_signal_type = translate_val(parm_dict['BE_phase_content'], ['chirp-sinc hybrid', '1/2 harmonic excitation', '1/3 harmonic excitation', 'pure sine'], [1, 2, 3, 4]) if BE_signal_type == None: raise NotImplementedError('This translator does not know how to ' 'handle "BE_phase_content": "{}"' ''.format(parm_dict['BE_phase_content'])) # This is necessary when normalizing the AI by the AO self.harmonic = BE_signal_type self.signal_type = BE_signal_type if BE_signal_type == 4: self.harmonic = 1 BE_amp = parm_dict['BE_amplitude_[V]'] try: VS_amp = parm_dict['VS_amplitude_[V]'] VS_offset = parm_dict['VS_offset_[V]'] # VS_read_voltage = parm_dict['VS_read_voltage_[V]'] VS_steps = parm_dict['VS_steps_per_full_cycle'] VS_cycles = parm_dict['VS_number_of_cycles'] VS_fraction = translate_val(parm_dict['VS_cycle_fraction'], ['full', '1/2', '1/4', '3/4'], [1., 0.5, 0.25, 0.75]) if VS_fraction is None: raise NotImplementedError( 'This translator does not know how to ' 'handle "VS_cycle_fraction": "{}"' ''.format(parm_dict['VS_cycle_fraction'])) VS_shift = parm_dict['VS_cycle_phase_shift'] except KeyError as exp: raise KeyError(exp) if VS_shift != 0: if self._verbose: print('\tVS_shift = {}'.format(VS_shift)) VS_shift = translate_val(VS_shift, ['1/4', '1/2', '3/4'], [0.25, 0.5, 0.75]) if VS_shift is None: raise NotImplementedError( 'This translator does not know how to ' 'handle "VS_cycle_phase_shift": "{}"' ''.format(parm_dict['VS_cycle_phase_shift'])) VS_in_out_cond = translate_val(parm_dict['VS_measure_in_field_loops'], ['out-of-field', 'in-field', 'in and out-of-field'], [0, 1, 2]) if VS_in_out_cond is None: raise NotImplementedError('This translator does not know how to ' 'handle "VS_measure_in_field_loops": ' '"{}"'.format(parm_dict['VS_measure_in_field_loops'])) VS_ACDC_cond = translate_val(parm_dict['VS_mode'], ['DC modulation mode', 'AC modulation mode with time reversal', 'load user defined VS Wave from file', 'current mode'], [0, 2, 3, 4]) if VS_ACDC_cond is None: raise NotImplementedError('This translator does not know how to ' 'handle "VS_Mode": "{}"' ''.format(parm_dict['VS_mode'])) self.expt_type = VS_ACDC_cond FORC_cycles = parm_dict['FORC_num_of_FORC_cycles'] FORC_A1 = parm_dict['FORC_V_high1_[V]'] FORC_A2 = parm_dict['FORC_V_high2_[V]'] # FORC_repeats = parm_dict['# of FORC repeats'] FORC_B1 = parm_dict['FORC_V_low1_[V]'] FORC_B2 = parm_dict['FORC_V_low2_[V]'] # % build vector of voltage spectroscopy values if self._verbose: print('\t\tBuilding spectroscopy waveform') if VS_ACDC_cond == 0 or VS_ACDC_cond == 4: if self._verbose: print('\t\t\tDC voltage spectroscopy or current mode for:') for key in ['VS_steps', 'VS_fraction', 'VS_shift', 'VS_cycles', 'VS_amp', 'VS_offset']: print('\t'*4 + '{} = {}'.format(key, repr(eval(key)))) vs_amp_vec = generate_bipolar_triangular_waveform(VS_steps, cycle_frac=VS_fraction, phase=VS_shift, amplitude=VS_amp, cycles=VS_cycles, offset=VS_offset) if self._verbose: print('\t\t\tgenerated triangular waveform of size: {}' ''.format(len(vs_amp_vec))) elif VS_ACDC_cond == 2: if self._verbose: print('\t\t\tAC voltage spectroscopy with time reversal') # Temporarily scale up the number of points in a cycle actual_cycle_pts = int(VS_steps // VS_fraction) vs_amp_vec = np.linspace(VS_amp / actual_cycle_pts, VS_amp, num=actual_cycle_pts, endpoint=True) # Apply phase offset via a roll: vs_amp_vec = np.roll(vs_amp_vec, int(VS_shift * actual_cycle_pts)) # Next truncate by the fraction vs_amp_vec = vs_amp_vec[:VS_steps] # Next offset: vs_amp_vec += VS_offset # Finally, tile by the number of cycles vs_amp_vec = np.tile(vs_amp_vec, int(VS_cycles)) if FORC_cycles > 1: if self._verbose: print('\t\t\tWorking on adding FORC') vs_amp_vec = vs_amp_vec / np.max(np.abs(vs_amp_vec)) FORC_cycle_vec = np.arange(0, FORC_cycles + 1, FORC_cycles / (FORC_cycles - 1)) FORC_A_vec = FORC_cycle_vec * (FORC_A2 - FORC_A1) / FORC_cycles + FORC_A1 FORC_B_vec = FORC_cycle_vec * (FORC_B2 - FORC_B1) / FORC_cycles + FORC_B1 FORC_amp_vec = (FORC_A_vec - FORC_B_vec) / 2 FORC_off_vec = (FORC_A_vec + FORC_B_vec) / 2 VS_amp_mat = np.tile(vs_amp_vec, [int(FORC_cycles), 1]) FORC_amp_mat = np.tile(FORC_amp_vec, [len(vs_amp_vec), 1]).transpose() FORC_off_mat = np.tile(FORC_off_vec, [len(vs_amp_vec), 1]).transpose() VS_amp_mat = VS_amp_mat * FORC_amp_mat + FORC_off_mat vs_amp_vec = VS_amp_mat.reshape(int(FORC_cycles * VS_cycles * VS_steps)) # Build UDVS table: if self._verbose: print('\t\tBuilding UDVS table') if VS_ACDC_cond == 0 or VS_ACDC_cond == 4: if VS_ACDC_cond == 0: if self._verbose: print('\t'*3 + 'DC Spectroscopy mode. Appending zeros') UD_dc_vec = np.vstack((vs_amp_vec, np.zeros(len(vs_amp_vec)))) if VS_ACDC_cond == 4: if self._verbose: print('\t'*3 + 'Current mode. Tiling vs amp vec') UD_dc_vec = np.vstack((vs_amp_vec, vs_amp_vec)) UD_dc_vec = UD_dc_vec.transpose().reshape(UD_dc_vec.size) num_VS_steps = UD_dc_vec.size UD_VS_table_label = ['step_num', 'dc_offset', 'ac_amp', 'wave_type', 'wave_mod', 'in-field', 'out-of-field'] UD_VS_table_unit = ['', 'V', 'A', '', '', 'V', 'V'] udvs_table = np.zeros(shape=(num_VS_steps, 7), dtype=np.float32) udvs_table[:, 0] = np.arange(0, num_VS_steps) # Python base 0 udvs_table[:, 1] = UD_dc_vec BE_IF_switch = np.abs(np.imag(np.exp(1j * np.pi / 2 * np.arange(1, num_VS_steps + 1)))) BE_OF_switch = np.abs(np.real(np.exp(1j * np.pi / 2 * np.arange(1, num_VS_steps + 1)))) """ Regardless of the field condition, both in and out of field rows WILL exist in the table. """ if VS_in_out_cond == 0: # out of field only udvs_table[:, 2] = BE_amp * BE_OF_switch elif VS_in_out_cond == 1: # in field only udvs_table[:, 2] = BE_amp * BE_IF_switch elif VS_in_out_cond == 2: # both in and out of field udvs_table[:, 2] = BE_amp * np.ones(num_VS_steps) udvs_table[:, 3] = np.ones(num_VS_steps) # wave type udvs_table[:, 4] = np.ones(num_VS_steps) * BE_signal_type # wave mod udvs_table[:, 5] = float('NaN') * np.ones(num_VS_steps) udvs_table[:, 6] = float('NaN') * np.ones(num_VS_steps) udvs_table[BE_IF_switch == 1, 5] = udvs_table[BE_IF_switch == 1, 1] udvs_table[BE_OF_switch == 1, 6] = udvs_table[BE_OF_switch == 1, 1] elif VS_ACDC_cond == 2: if self._verbose: print('\t\t\tAC voltage spectroscopy') num_VS_steps = vs_amp_vec.size half = int(0.5 * num_VS_steps) if num_VS_steps != half * 2: raise ValueError('Odd number of UDVS steps found. Exiting!') UD_dc_vec = VS_offset * np.ones(num_VS_steps) UD_VS_table_label = ['step_num', 'dc_offset', 'ac_amp', 'wave_type', 'wave_mod', 'forward', 'reverse'] UD_VS_table_unit = ['', 'V', 'A', '', '', 'A', 'A'] udvs_table = np.zeros(shape=(num_VS_steps, 7), dtype=np.float32) udvs_table[:, 0] = np.arange(1, num_VS_steps + 1) udvs_table[:, 1] = UD_dc_vec udvs_table[:, 2] = vs_amp_vec udvs_table[:, 3] = np.ones(num_VS_steps) udvs_table[:half, 4] = BE_signal_type * np.ones(half) udvs_table[half:, 4] = -1 * BE_signal_type * np.ones(half) udvs_table[:, 5] = float('NaN') * np.ones(num_VS_steps) udvs_table[:, 6] = float('NaN') * np.ones(num_VS_steps) udvs_table[:half, 5] = vs_amp_vec[:half] udvs_table[half:, 6] = vs_amp_vec[half:] else: raise NotImplementedError('Not handling VS_ACDC condition: {}'.format(VS_ACDC_cond)) return UD_VS_table_label, UD_VS_table_unit, udvs_table @staticmethod def _get_random_spectra(parsers, num_pixels, num_udvs_steps, num_bins, num_spectra=100, verbose=False): """ Parameters ---------- parsers : list of BEodfParser objects parsers to seek into files to grab spectra num_pixels : unsigned int Number of spatial positions in the image num_udvs_steps : unsigned int Number of UDVS steps num_bins : unsigned int Number of frequency bins in every UDVS step num_spectra : unsigned int Total number of spectra to be extracted verbose : Boolean, optional Whether or not to print debugging statements Returns ------- chosen_spectra : 2D complex numpy array spectrogram or spectra arranged as [instance, spectrum] """ if verbose: print('\t' * 4 + 'Getting random spectra for Q factor testing') print('\t' * 4 + 'num_pixels: {} num_udvs_steps: {}, num_bins: {},' ' num_spectra: {}'.format(num_pixels, num_udvs_steps, num_bins, num_spectra)) num_pixels = int(num_pixels) num_udvs_steps = int(num_udvs_steps) num_bins = int(num_bins) num_spectra = min(num_spectra, len(parsers) * num_pixels * num_udvs_steps) selected_pixels = np.random.randint(0, num_pixels, size=num_spectra) selected_steps = np.random.randint(0, num_udvs_steps, size=num_spectra) selected_parsers = np.random.randint(0, len(parsers), size=num_spectra) if verbose and False: print('\t' * 4 + 'Selecting the following random pixels, ' 'UDVS steps, parsers') print('\t' * 4 + 'num_spectra: {}'.format(num_spectra)) print('\t' * 4 + 'selected_pixels:\n{}'.format(selected_pixels)) print('\t' * 4 + 'selected_steps:\n{}'.format(selected_steps)) print('\t' * 4 + 'selected_parsers:\n{}'.format(selected_parsers)) chosen_spectra = list() # np.zeros(shape=(num_spectra, num_bins), dtype=np.complex64) for spectra_index in range(num_spectra): prsr = parsers[selected_parsers[spectra_index]] prsr.seek_to_pixel(selected_pixels[spectra_index]) if verbose and False: print('\t' * 5 + 'Seeking and reading pixel #{}' ''.format(selected_pixels[spectra_index])) raw_vec = prsr.read_pixel() if len(raw_vec) < 1: # Empty pixel at the end of the file that may be missing continue spectrogram = raw_vec.reshape(num_udvs_steps, -1) if verbose and False: print('\t' * 5 + 'reshaped raw vector for pixel of shape {} by' ' UDVS step to: {} and taking spectrum at ' 'index: {}'.format(raw_vec.shape, spectrogram.shape, selected_steps[spectra_index])) # chosen_spectra[spectra_index] = spectrogram[selected_steps[spectra_index]] chosen_spectra.append(spectrogram[selected_steps[spectra_index]]) for prsr in parsers: prsr.reset() chosen_spectra = np.array(chosen_spectra) if verbose: print('\t' * 5 + 'chosen spectra of shape: {}' ''.format(chosen_spectra.shape)) return chosen_spectra
[docs] class BEodfParser(object): def __init__(self, real_path, imag_path, num_pix, bytes_per_pix): """ This object reads the two binary data files (real and imaginary data). Use separate parser instances for in-field and out-field data sets. Parameters -------------------- real_path : String / Unicode absolute path of the binary file containing the real portion of the data imag_path : String / Unicode absolute path of the binary file containing the imaginary portion of the data num_pix : unsigned int Number of pixels in this image bytes_per_pix : unsigned int Number of bytes per pixel """ self.f_real = open(real_path, "rb") self.f_imag = open(imag_path, "rb") self.__num_pix__ = num_pix self.__bytes_per_pix__ = bytes_per_pix self.__pix_indx__ = 0
[docs] def read_pixel(self): """ Returns the content of the next pixel Returns ------- raw_vec : 1D numpy complex64 array Content of one pixel's data """ if self.__num_pix__ is not None: if self.__pix_indx__ == self.__num_pix__: warn('BEodfParser - No more pixels to read!') return None self.f_real.seek(self.__pix_indx__ * self.__bytes_per_pix__, 0) real_vec = np.fromstring(self.f_real.read(self.__bytes_per_pix__), dtype='f') self.f_imag.seek(self.__pix_indx__ * self.__bytes_per_pix__, 0) imag_vec = np.fromstring(self.f_imag.read(self.__bytes_per_pix__), dtype='f') raw_vec = np.zeros(len(real_vec), dtype=np.complex64) raw_vec.real = real_vec raw_vec.imag = imag_vec self.__pix_indx__ += 1 if self.__pix_indx__ == self.__num_pix__: self.f_real.close() self.f_imag.close() return raw_vec
[docs] def read_all_data(self): """ Returns the complete contents of the file pair Returns ------- raw_vec : 1D numpy complex64 array Entire content of the file pair """ self.f_real.seek(0, 0) self.f_imag.seek(0, 0) d_real = np.fromstring(self.f_real.read(), dtype='f') d_imag = np.fromstring(self.f_imag.read(), dtype='f') full_file = d_real + 1j * d_imag self.f_real.close() self.f_imag.close() return full_file
[docs] def seek_to_pixel(self, pixel_ind): """ Parameters ---------- pixel_ind Returns ------- """ if self.__num_pix__ is not None: pixel_ind = min(pixel_ind, self.__num_pix__) self.__pix_indx__ = pixel_ind
[docs] def reset(self): """ """ self.f_real.seek(0, 0) self.f_imag.seek(0, 0) self.__pix_indx__ = 0