# -*- coding: utf-8 -*-
"""
Created on Tue Nov 3 15:24:12 2015
@author: Chris R. Smith
"""
from __future__ import division, print_function, absolute_import, unicode_literals
from warnings import warn
import sys
import datetime
import h5py
import os
import numpy as np
from sidpy.sid import Translator
from sidpy.hdf.hdf_utils import get_attr, write_simple_attrs
from pyUSID.io.hdf_utils import link_as_main, find_results_groups, \
check_and_link_ancillary, find_dataset
from pyUSID.io.anc_build_utils import create_spec_inds_from_vals
from .df_utils.be_utils import remove_non_exist_spec_dim_labs
if sys.version_info.major == 3:
unicode = str
[docs]
class LabViewH5Patcher(Translator):
"""
Patches the hdf5 files from the LabView V3 data aquisition software to meet the
standards of the Pycroscopy data format.
"""
def __init__(self):
super(LabViewH5Patcher, self).__init__()
def _parse_file_path(self, input_path):
pass
def _read_data(self):
pass
[docs]
@staticmethod
def is_valid_file(file_path):
"""
Checks whether the provided file can be read by this translator
Parameters
----------
file_path : str
Path to raw data file
Returns
-------
obj : str
Path to file that will be accepted by the translate() function if
this translator is indeed capable of translating the provided file.
Otherwise, None will be returned
"""
if not isinstance(file_path, (str, unicode)):
raise TypeError('file_path should be a string object')
if not os.path.isfile(file_path):
return None
file_path = os.path.abspath(file_path)
extension = os.path.splitext(file_path)[1][1:]
if extension not in ['h5', 'hdf5']:
return None
try:
h5_f = h5py.File(file_path, 'r+')
except:
return None
# TODO: Make this check as lot stronger. Currently brittle
if 'DAQ_software_version_name' not in h5_f.attrs.keys():
return None
if len(find_dataset(h5_f, 'Raw_Data')) < 1:
return None
return file_path
[docs]
def translate(self, h5_path, force_patch=False, **kwargs):
"""
Add the needed references and attributes to the h5 file that are not created by the
LabView data aquisition program.
Parameters
----------
h5_path : str
path to the h5 file
force_patch : bool, optional
Should the check to see if the file has already been patched be ignored.
Default False.
Returns
-------
h5_file : str
path to the patched dataset
"""
#TODO: Need a way to choose which channels to apply the patcher to,
#fails for multi-channel files where not all files are capable of being main datasets
# Open the file and check if a patch is needed
h5_file = h5py.File(os.path.abspath(h5_path), 'r+')
if h5_file.attrs.get('translator') is not None and not force_patch:
print('File is already Pycroscopy ready.')
h5_file.close()
return h5_path
'''
Get the list of all Raw_Data Datasets
Loop over the list and update the needed attributes
'''
raw_list = find_dataset(h5_file, 'Raw_Data')
for h5_raw in raw_list:
if 'quantity' not in h5_raw.attrs:
h5_raw.attrs['quantity'] = 'quantity'
if 'units' not in h5_raw.attrs:
h5_raw.attrs['units'] = 'a.u.'
# Grab the channel and measurement group of the data to check some needed attributes
h5_chan = h5_raw.parent
try:
c_type = get_attr(h5_chan, 'channel_type')
except KeyError:
warn_str = "'channel_type' was not found as an attribute of {}.\n".format(h5_chan.name)
warn_str += "If this is BEPS or BELine data from the LabView aquisition software, " + \
"please run the following piece of code. Afterwards, run this function again.\n" + \
"CODE: " \
"hdf.file['{}'].attrs['channel_type'] = 'BE'".format(h5_chan.name)
warn(warn_str)
h5_file.close()
return h5_path
except:
raise
if c_type != 'BE':
continue
h5_meas = h5_chan.parent
h5_meas.attrs['num_UDVS_steps'] = h5_meas.attrs['num_steps']
# Get the object handles for the Indices and Values datasets
h5_pos_inds = h5_chan['Position_Indices']
h5_pos_vals = h5_chan['Position_Values']
h5_spec_inds = h5_chan['Spectroscopic_Indices']
h5_spec_vals = h5_chan['Spectroscopic_Values']
# Make sure we have correct spectroscopic indices for the given values
ds_spec_inds = create_spec_inds_from_vals(h5_spec_vals[()])
if not np.allclose(ds_spec_inds, h5_spec_inds[()]):
h5_spec_inds[:, :] = ds_spec_inds[:, :]
h5_file.flush()
# Get the labels and units for the Spectroscopic datasets
h5_spec_labels = h5_spec_inds.attrs['labels']
inds_and_vals = [h5_pos_inds, h5_pos_vals, h5_spec_inds, h5_spec_vals]
for dset in inds_and_vals:
spec_labels = dset.attrs['labels']
try:
spec_units = dset.attrs['units']
if len(spec_units) != len(spec_labels):
raise KeyError
except KeyError:
dset['units'] = ['' for _ in spec_labels]
except:
raise
""""
In early versions, too many spectroscopic dimension labels and
units were listed compared to the number of rows. Remove here:
"""
remove_non_exist_spec_dim_labs(h5_spec_inds, h5_spec_vals,
h5_meas, verbose=False)
"""
Add back some standard metadata to be consistent with older
BE data
"""
missing_metadata = dict()
if 'File_file_name' not in h5_meas.attrs.keys():
missing_metadata['File_file_name'] = os.path.split(h5_raw.file.filename)[-1].replace('.h5', '')
if 'File_date_and_time' not in h5_meas.attrs.keys():
try:
date_str = get_attr(h5_raw.file, 'date_string')
time_str = get_attr(h5_raw.file, 'time_string')
full_str = date_str.strip() + ' ' + time_str.strip()
"""
convert:
date_string : 2018-12-05
time_string : 3:41:45 PM
to:
File_date_and_time: 19-Jun-2009 18:44:56
"""
try:
dt_obj = datetime.datetime.strptime(full_str,
"%Y-%m-%d %I:%M:%S %p")
missing_metadata['File_date_and_time'] = dt_obj.strftime('%d-%b-%Y %H:%M:%S')
except ValueError:
pass
except KeyError:
pass
# Now write to measurement group:
if len(missing_metadata) > 0:
write_simple_attrs(h5_meas, missing_metadata)
# Link the references to the Indices and Values datasets to the Raw_Data
print(h5_raw.shape,
h5_pos_vals.shape, h5_spec_vals.shape)
print(h5_spec_inds.shape, h5_pos_inds.shape)
link_as_main(h5_raw, h5_pos_inds, h5_pos_vals, h5_spec_inds, h5_spec_vals)
# Also link the Bin_Frequencies and Bin_Wfm_Type datasets
h5_freqs = h5_chan['Bin_Frequencies']
aux_dset_names = ['Bin_Frequencies']
aux_dset_refs = [h5_freqs.ref]
check_and_link_ancillary(h5_raw, aux_dset_names, anc_refs=aux_dset_refs)
'''
Get all SHO_Fit groups for the Raw_Data and loop over them
Get the Guess and Spectroscopic Datasets for each SHO_Fit group
'''
sho_list = find_results_groups(h5_raw, 'SHO_Fit')
for h5_sho in sho_list:
h5_sho_guess = h5_sho['Guess']
h5_sho_spec_inds = h5_sho['Spectroscopic_Indices']
h5_sho_spec_vals = h5_sho['Spectroscopic_Values']
# Make sure we have correct spectroscopic indices for the given values
ds_sho_spec_inds = create_spec_inds_from_vals(h5_sho_spec_inds[()])
if not np.allclose(ds_sho_spec_inds, h5_sho_spec_inds[()]):
h5_sho_spec_inds[:, :] = ds_sho_spec_inds[:, :]
# Get the labels and units for the Spectroscopic datasets
h5_sho_spec_labels = get_attr(h5_sho_spec_inds, 'labels')
h5_sho_spec_units = get_attr(h5_sho_spec_vals, 'units')
if h5_sho_spec_inds.shape[-1] != h5_sho_guess.shape[-1]:
print('Warning! Found incorrect spectral dimension for dataset {}. Attempting a fix.'.format(h5_sho_guess))
try:
h5_sho_spec_inds = h5_sho_guess.parent.create_dataset("h5_sho_spec_inds_fixed",
shape=(1, 1),dtype = 'uint32')
h5_sho_spec_inds.attrs['labels'] = 'labels'
h5_sho_spec_inds.attrs['units'] = 'units'
except RuntimeError:
print("It seems that the file has already been patched."
" Will use previously computed ancilliary datasets")
h5_sho_spec_inds = h5_sho_guess.parent['h5_sho_spec_inds_fixed']
try:
h5_sho_spec_vals = h5_sho_guess.parent.create_dataset("h5_sho_spec_vals_fixed",
shape=(1, 1), dtype = 'uint32')
h5_sho_spec_vals[:] = 0
h5_sho_spec_vals.attrs['labels'] = 'labels'
h5_sho_spec_vals.attrs['units'] = 'units'
except RuntimeError:
print("It seems that the file has already been patched."
" Will use previously computed ancilliary datasets")
h5_sho_spec_vals = h5_sho_guess.parent['h5_sho_spec_vals_fixed2']
link_as_main(h5_sho_guess, h5_pos_inds, h5_pos_vals, h5_sho_spec_inds, h5_sho_spec_vals)
sho_inds_and_vals = [h5_sho_spec_inds, h5_sho_spec_vals]
for dset in sho_inds_and_vals:
spec_labels = get_attr(dset, 'labels')
try:
spec_units = get_attr(dset, 'units')
if len(spec_units) != len(spec_labels):
raise KeyError
except KeyError:
spec_units = [''.encode('utf-8') for _ in spec_labels]
dset.attrs['units'] = spec_units
except:
raise
h5_file.flush()
h5_file.attrs['translator'] = 'V3patcher'.encode('utf-8')
h5_file.close()
return h5_path