Source code for pyUSID.io.hdf_utils.simple

# -*- coding: utf-8 -*-
"""
Lower-level and simpler USID-specific HDF5 utilities that facilitate higher-level data operations

Created on Tue Nov  3 21:14:25 2015

@author: Suhas Somnath, Chris Smith
"""
from __future__ import division, print_function, absolute_import, unicode_literals
import collections.abc as collections
from warnings import warn
import sys
import h5py
import numpy as np
import dask.array as da
from sidpy.hdf.hdf_utils import get_auxiliary_datasets, link_h5_obj_as_alias, \
    write_simple_attrs, is_editable_h5, validate_h5_objs_in_same_h5_file, \
    get_attr
from sidpy.hdf.dtype_utils import validate_dtype
from sidpy.hdf import hdf_utils as hut
from sidpy.base.string_utils import validate_single_string_arg, validate_list_of_strings
from sidpy.base.num_utils import contains_integers
from sidpy.base.string_utils import clean_string_att

from ..anc_build_utils import build_ind_val_matrices, INDICES_DTYPE, VALUES_DTYPE
from ..dimension import DimType, Dimension
from .base import write_book_keeping_attrs

if sys.version_info.major == 3:
    unicode = str
"""
__all__ = ['assign_group_index', 'check_and_link_ancillary', 'check_for_matching_attrs', 'check_for_old',
           'check_if_main', 'copy_attributes', 'copy_main_attributes']
"""


[docs] def get_all_main(parent, verbose=False): """ Simple function to recursively print the contents of an hdf5 group Parameters ---------- parent : :class:`h5py.Group` HDF5 Group to search within verbose : bool, optional. Default = False If true, extra print statements (usually for debugging) are enabled Returns ------- main_list : list of h5py.Dataset The datasets found in the file that meet the 'Main Data' criteria. """ if not isinstance(parent, (h5py.Group, h5py.File)): raise TypeError('parent should be a h5py.File or h5py.Group object') from ..usi_data import USIDataset main_list = list() def __check(name, obj): if verbose: print(name, obj) if isinstance(obj, h5py.Dataset): if verbose: print(name, 'is an HDF5 Dataset.') ismain = check_if_main(obj) if ismain: if verbose: print(name, 'is a `Main` dataset.') main_list.append(USIDataset(obj)) if verbose: print('Checking the group {} for `Main` datasets.'.format(parent.name)) parent.visititems(__check) return main_list
[docs] def find_dataset(h5_group, dset_name): """ Uses visit() to find all datasets with the desired name Parameters ---------- h5_group : :class:`h5py.Group` Group to search within for the Dataset dset_name : str Name of the dataset to search for Returns ------- datasets : list List of [Name, object] pairs corresponding to datasets that match `ds_name`. """ from ..usi_data import USIDataset datasets = list() for obj in hut.find_dataset(h5_group, dset_name): try: datasets.append(USIDataset(obj)) except TypeError: datasets.append(obj) return datasets
[docs] def find_results_groups(h5_main, tool_name, h5_parent_group=None): """ Finds a list of all groups containing results of the process of name `tool_name` being applied to the dataset Parameters ---------- h5_main : h5 dataset reference Reference to the target dataset to which the tool was applied tool_name : String / unicode Name of the tool applied to the target dataset h5_parent_group : h5py.Group, optional. Default = None Parent group under which the results group will be searched for. Use this option when the results groups are contained in different HDF5 file compared to `h5_main`. BY default, this function will search within the same group that contains `h5_main` Returns ------- groups : list of references to :class:`h5py.Group` objects groups whose name contains the tool name and the dataset name """ if not isinstance(h5_main, h5py.Dataset): raise TypeError('h5_main should be a h5py.Dataset object') tool_name = validate_single_string_arg(tool_name, 'tool_name') if h5_parent_group is not None: if not isinstance(h5_parent_group, (h5py.File, h5py.Group)): raise TypeError("'h5_parent_group' should either be a h5py.File " "or h5py.Group object") else: h5_parent_group = h5_main.parent dset_name = h5_main.name.split('/')[-1] groups = [] for key in h5_parent_group.keys(): if dset_name in key and tool_name in key and isinstance(h5_parent_group[key], h5py.Group): groups.append(h5_parent_group[key]) return groups
[docs] def validate_main_dset(h5_main, must_be_h5): """ Checks to make sure that the provided object is a USID main dataset Errors in parameters will result in Exceptions Parameters ---------- h5_main : h5py.Dataset or numpy.ndarray or Dask.array.core.array object that represents the USID main data must_be_h5 : bool Set to True if the expecting an h5py.Dataset object. Set to False if expecting a numpy.ndarray or Dask.array.core.array Returns ------- """ # Check that h5_main is a dataset if must_be_h5: if not isinstance(h5_main, h5py.Dataset): raise TypeError('{} is not an HDF5 Dataset object.'.format(h5_main)) else: if not isinstance(h5_main, (np.ndarray, da.core.Array)): raise TypeError('raw_data should either be a np.ndarray or a da.core.Array') # Check dimensionality if len(h5_main.shape) != 2: raise ValueError('Main data is not 2D. Provided object has shape: {}'.format(h5_main.shape))
[docs] def validate_anc_h5_dsets(h5_inds, h5_vals, main_shape, is_spectroscopic=True): """ Checks ancillary HDF5 datasets against shape of a main dataset. Errors in parameters will result in Exceptions Parameters ---------- h5_inds : h5py.Dataset HDF5 dataset corresponding to the ancillary Indices dataset h5_vals : h5py.Dataset HDF5 dataset corresponding to the ancillary Values dataset main_shape : array-like Shape of the main dataset expressed as a tuple or similar is_spectroscopic : bool, Optional. Default = True set to True if ``dims`` correspond to Spectroscopic Dimensions. False otherwise. """ if not isinstance(h5_inds, h5py.Dataset): raise TypeError('h5_inds must be a h5py.Dataset object') if not isinstance(h5_vals, h5py.Dataset): raise TypeError('h5_vals must be a h5py.Dataset object') if h5_inds.shape != h5_vals.shape: raise ValueError('h5_inds: {} and h5_vals: {} should be of the same ' 'shape'.format(h5_inds.shape, h5_vals.shape)) if isinstance(main_shape, (list, tuple)): if not contains_integers(main_shape, min_val=1) or \ len(main_shape) != 2: raise ValueError("'main_shape' must be a valid HDF5 dataset shape") else: raise TypeError('main_shape should be of the following types:' 'h5py.Dataset, tuple, or list. {} provided' ''.format(type(main_shape))) if h5_inds.shape[is_spectroscopic] != main_shape[is_spectroscopic]: raise ValueError('index {} in shape of h5_inds: {} and main_data: {} ' 'should be equal'.format(int(is_spectroscopic), h5_inds.shape, main_shape))
[docs] def validate_dims_against_main(main_shape, dims, is_spectroscopic=True): """ Checks Dimension objects against a given shape for main datasets. Errors in parameters will result in Exceptions Parameters ---------- main_shape : array-like Tuple or list with the shape of the main data dims : iterable List of Dimension objects is_spectroscopic : bool, Optional. Default = True set to True if ``dims`` correspond to Spectroscopic Dimensions. False otherwise. """ if not isinstance(main_shape, (list, tuple)): raise TypeError('main_shape should be a list or tuple. Provided object' ' was of type: {}'.format(type(main_shape))) if len(main_shape) != 2: raise ValueError('"main_shape" should be of length 2') contains_integers(main_shape, min_val=1) if isinstance(dims, Dimension): dims = [dims] elif not isinstance(dims, (list, tuple)): raise TypeError('"dims" must be a list or tuple of usid.Dimension ' 'objects. Provided object was of type: {}' ''.format(type(dims))) if not all([isinstance(obj, Dimension) for obj in dims]): raise TypeError('One or more objects in "dims" was not usid.Dimension') if is_spectroscopic: main_dim = 1 dim_category = 'Spectroscopic' else: main_dim = 0 dim_category = 'Position' # TODO: This is where the dimension type will need to be taken into account lhs = main_shape[main_dim] rhs = np.product([len(x.values) for x in dims]) if lhs != rhs: raise ValueError(dim_category + ' dimensions in main data of size: {} do not match ' 'with product of values in provided Dimension objects' ': {}'.format(lhs, rhs))
[docs] def check_if_main(h5_main, verbose=False): """ Checks the input dataset to see if it has all the necessary features to be considered a Main dataset. This means it is 2D and has the following attributes: * Position_Indices * Position_Values * Spectroscopic_Indices * Spectroscopic_Values * quantity * units In addition, the shapes of the ancillary matrices should match with that of h5_main Parameters ---------- h5_main : HDF5 Dataset Dataset of interest verbose : Boolean (Optional. Default = False) Whether or not to print statements Returns ------- success : Boolean True if all tests pass """ try: validate_main_dset(h5_main, True) except Exception as exep: if verbose: print(exep) return False h5_name = h5_main.name.split('/')[-1] success = True # Check for Datasets dset_names = ['Position_Indices', 'Position_Values', 'Spectroscopic_Indices', 'Spectroscopic_Values'] for name in dset_names: try: h5_anc_dset = h5_main.file[h5_main.attrs[name]] success = np.all([success, isinstance(h5_anc_dset, h5py.Dataset)]) except: if verbose: print('{} not found as an attribute of {}.'.format(name, h5_name)) return False attr_success = np.all([att in h5_main.attrs for att in ['quantity', 'units']]) if not attr_success: if verbose: print('{} does not have the mandatory "quantity" and "units" attributes'.format(h5_main.name)) return False for attr_name in ['quantity', 'units']: val = get_attr(h5_main, attr_name) if not isinstance(val, (str, unicode)): if verbose: print('Attribute {} of {} found to be {}. Expected a string'.format(attr_name, h5_main.name, val)) return False # Blindly linking four datasets is still not sufficient. The sizes need to match: anc_shape_match = list() h5_pos_inds = h5_main.file[h5_main.attrs['Position_Indices']] h5_pos_vals = h5_main.file[h5_main.attrs['Position_Values']] anc_shape_match.append(np.all(h5_pos_vals.shape == h5_pos_inds.shape)) for anc_dset in [h5_pos_vals, h5_pos_inds]: anc_shape_match.append(np.all(h5_main.shape[0] == anc_dset.shape[0])) if not np.all(anc_shape_match): if verbose: print('The shapes of the Position indices:{}, values:{} datasets did not match with that of the main ' 'dataset: {}'.format(h5_pos_inds.shape, h5_pos_vals.shape, h5_main.shape)) return False anc_shape_match = list() h5_spec_inds = h5_main.file[h5_main.attrs['Spectroscopic_Indices']] h5_spec_vals = h5_main.file[h5_main.attrs['Spectroscopic_Values']] anc_shape_match.append(np.all(h5_spec_inds.shape == h5_spec_vals.shape)) for anc_dset in [h5_spec_inds, h5_spec_vals]: anc_shape_match.append(np.all(h5_main.shape[1] == anc_dset.shape[1])) if not np.all(anc_shape_match): if verbose: print('The shapes of the Spectroscopic indices:{}, values:{} datasets did not match with that of the main ' 'dataset: {}'.format(h5_spec_inds.shape, h5_spec_vals.shape, h5_main.shape)) return False try: validate_anc_dset_attrs(h5_pos_inds, h5_pos_vals, is_spec=False) except ValueError: if verbose: print('Attributes of Position datasets did not match') return False try: validate_anc_dset_attrs(h5_spec_inds, h5_spec_vals, is_spec=True) except ValueError: if verbose: print('Attributes of Spectroscopic datasets did not match') return False return success
[docs] def validate_anc_dset_attrs(h5_inds, h5_vals, is_spec=True): """ Validates the attributes of a pair of indices and values datasets. Throws ValueErrors if any rule is not satisfied Parameters ---------- h5_inds : h5py.Dataset Indices dataset h5_vals : h5py.Dataset Values Dataset is_spec : bool, optional. Default = True Set to True if spectroscopic. Else - Position datasets """ def lists_match(left, right): if len(left) != len(right): return False return all([l_it == r_it for l_it, r_it in zip(left, right)]) v_names = get_attr(h5_vals, 'labels') v_units = get_attr(h5_vals, 'units') i_names = get_attr(h5_inds, 'labels') i_units = get_attr(h5_inds, 'units') for names, units, dset_type in zip([v_names, i_names], [v_units, i_units], ['Values', 'Indices']): if len(names) != len(units): raise ValueError('Length of labels: {} and units: {} for the {} ' 'dataset do not match' ''.format(len(names), len(units), dset_type)) for i_item, v_item, prop in zip([i_names, i_units], [v_names, v_units], ['labels', 'units']): if not lists_match(i_item, v_item): raise ValueError('The "{}" values of the Indices: {} and Values: ' '{} datasets do not match'.format(prop, i_item, v_item)) # Now check the rows / cols nums against size of any attr: if h5_inds.shape != h5_vals.shape: raise ValueError('Shape of Indices: {} and Values: {} datasets do ' 'not match'.format(h5_inds.shape, h5_vals.shape)) dim_ind = 1 if is_spec: dim_ind = 0 if h5_inds.shape[dim_ind] != len(v_names): raise ValueError('Length of mandatory attributes: {} did not match ' 'dimension: {} of the ancillary dataset of shape: {}' ''.format(len(v_names), dim_ind, h5_inds.shape))
[docs] def check_for_old(h5_base, tool_name, new_parms=None, target_dset=None, h5_parent_goup=None, verbose=False): """ Check to see if the results of a tool already exist and if they were performed with the same parameters. Parameters ---------- h5_base : h5py.Dataset object Dataset on which the tool is being applied to tool_name : str process or analysis name new_parms : dict, optional Parameters with which this tool will be performed. target_dset : str, optional, default = None Name of the dataset whose attributes will be compared against new_parms. Default - checking against the group h5_parent_goup : h5py.Group, optional. Default = None The group to search under. Use this option when `h5_base` and the potential results groups (within `h5_parent_goup` are located in different HDF5 files. Default - search within h5_base.parent verbose : bool, optional, default = False Whether or not to print debugging statements Returns ------- group : list List of all :class:`h5py.Group` objects with parameters matching those in `new_parms` """ if not isinstance(h5_base, h5py.Dataset): raise TypeError('h5_base should be a h5py.Dataset object') tool_name = validate_single_string_arg(tool_name, 'tool_name') if h5_parent_goup is not None: if not isinstance(h5_parent_goup, (h5py.File, h5py.Group)): raise TypeError("'h5_parent_group' should either be a h5py.File " "or h5py.Group object") else: h5_parent_goup = h5_base.parent if new_parms is None: new_parms = dict() else: if not isinstance(new_parms, dict): raise TypeError('new_parms should be a dict') if target_dset is not None: target_dset = validate_single_string_arg(target_dset, 'target_dset') matching_groups = [] groups = find_results_groups(h5_base, tool_name, h5_parent_group=h5_parent_goup) for group in groups: if verbose: print('Looking at group - {}'.format(group.name.split('/')[-1])) h5_obj = group if target_dset is not None: if target_dset in group.keys(): h5_obj = group[target_dset] else: if verbose: print('{} did not contain the target dataset: {}'.format(group.name.split('/')[-1], target_dset)) continue if check_for_matching_attrs(h5_obj, new_parms=new_parms, verbose=verbose): # return group matching_groups.append(group) return matching_groups
[docs] def get_source_dataset(h5_group): """ Find the name of the source dataset used to create the input `h5_group`, so long as the source dataset is in the same HDF5 file Parameters ---------- h5_group : :class:`h5py.Group` Child group whose source dataset will be returned Returns ------- h5_source : USIDataset object Main dataset from which this group was generated """ if not isinstance(h5_group, h5py.Group): raise TypeError('h5_group should be a h5py.Group object') h5_parent_group = h5_group.parent group_name = h5_group.name.split('/')[-1] # What if the group name was not formatted according to Pycroscopy rules? name_split = group_name.split('-') if len(name_split) != 2: raise ValueError("The provided group's name could not be split by '-' as expected in " "SourceDataset-ProcessName_000") h5_source = h5_parent_group[name_split[0]] if not isinstance(h5_source, h5py.Dataset): raise ValueError('Source object was not a dataset!') from ..usi_data import USIDataset return USIDataset(h5_source)
[docs] def assign_group_index(h5_parent_group, base_name, verbose=False): """ Searches the parent h5 group to find the next available index for the group Parameters ---------- h5_parent_group : :class:`h5py.Group` object Parent group under which the new group object will be created base_name : str or unicode Base name of the new group without index verbose : bool, optional. Default=False Whether or not to print debugging statements Returns ------- base_name : str or unicode Base name of the new group with the next available index as a suffix """ if not isinstance(h5_parent_group, h5py.Group): raise TypeError('h5_parent_group should be a h5py.Group object') base_name = validate_single_string_arg(base_name, 'base_name') if len(base_name) == 0: raise ValueError('base_name should not be an empty string') if not base_name.endswith('_'): base_name += '_' temp = [key for key in h5_parent_group.keys()] if verbose: print('Looking for group names starting with {} in parent containing items: ' '{}'.format(base_name, temp)) previous_indices = [] for item_name in temp: if isinstance(h5_parent_group[item_name], h5py.Group) and item_name.startswith(base_name): previous_indices.append(int(item_name.replace(base_name, ''))) previous_indices = np.sort(previous_indices) if verbose: print('indices of existing groups with the same prefix: {}'.format(previous_indices)) if len(previous_indices) == 0: index = 0 else: index = previous_indices[-1] + 1 return base_name + '{:03d}'.format(index)
[docs] def create_indexed_group(h5_parent_group, base_name): """ Creates a group with an indexed name (eg - 'Measurement_012') under h5_parent_group using the provided base_name as a prefix for the group's name Parameters ---------- h5_parent_group : :class:`h5py.Group` or :class:`h5py.File` File or group within which the new group will be created base_name : str or unicode Prefix for the group name. This need not end with a '_'. It will be added automatically Returns ------- """ if not isinstance(h5_parent_group, (h5py.Group, h5py.File)): raise TypeError('h5_parent_group should be a h5py.File or Group object') base_name = validate_single_string_arg(base_name, 'base_name') group_name = assign_group_index(h5_parent_group, base_name) h5_new_group = h5_parent_group.create_group(group_name) write_book_keeping_attrs(h5_new_group) return h5_new_group
[docs] def create_results_group(h5_main, tool_name, h5_parent_group=None): """ Creates a h5py.Group object autoindexed and named as 'DatasetName-ToolName_00x' Parameters ---------- h5_main : h5py.Dataset object Reference to the dataset based on which the process / analysis is being performed tool_name : string / unicode Name of the Process / Analysis applied to h5_main h5_parent_group : h5py.Group, optional. Default = None Parent group under which the results group will be created. Use this option to write results into a new HDF5 file. By default, results will be written into the same group containing `h5_main` Returns ------- h5_group : :class:`h5py.Group` Results group which can now house the results datasets """ if not isinstance(h5_main, h5py.Dataset): raise TypeError('h5_main should be a h5py.Dataset object') if h5_parent_group is not None: if not isinstance(h5_parent_group, (h5py.File, h5py.Group)): raise TypeError("'h5_parent_group' should either be a h5py.File " "or h5py.Group object") else: h5_parent_group = h5_main.parent tool_name = validate_single_string_arg(tool_name, 'tool_name') if '-' in tool_name: warn('tool_name should not contain the "-" character. Reformatted name from:{} to ' '{}'.format(tool_name, tool_name.replace('-', '_'))) tool_name = tool_name.replace('-', '_') group_name = h5_main.name.split('/')[-1] + '-' + tool_name + '_' group_name = assign_group_index(h5_parent_group, group_name) h5_group = h5_parent_group.create_group(group_name) write_book_keeping_attrs(h5_group) # Also add some basic attributes like source and tool name. This will allow relaxation of nomenclature restrictions: # this are NOT being used right now but will be in the subsequent versions of pyUSID write_simple_attrs(h5_group, {'tool': tool_name, 'num_source_dsets': 1}) # in this case, there is only one source if h5_parent_group.file == h5_main.file: for dset_ind, dset in enumerate([h5_main]): h5_group.attrs['source_' + '{:03d}'.format(dset_ind)] = dset.ref return h5_group
[docs] def copy_main_attributes(h5_main, h5_new): """ Copies the units and quantity name from one dataset to another Parameters ---------- h5_main : h5py.Dataset Dataset containing the target attributes h5_new : h5py.Dataset Dataset to which the target attributes are to be copied """ for param, param_name in zip([h5_main, h5_new], ['h5_main', 'h5_new']): if not isinstance(param, h5py.Dataset): raise TypeError(param_name + ' should be a h5py.Dataset object') for att_name in ['quantity', 'units']: if att_name not in h5_main.attrs: raise KeyError('Attribute: {} does not exist in {}'.format(att_name, h5_main)) val = get_attr(h5_main, att_name) h5_new.attrs[att_name] = clean_string_att(val)
[docs] def create_empty_dataset(source_dset, dtype, dset_name, h5_group=None, new_attrs=None, skip_refs=False): """ Creates an empty dataset in the h5 file based on the provided dataset in the same or specified group Parameters ---------- source_dset : h5py.Dataset object Source object that provides information on the group and shape of the dataset dtype : dtype Data type of the fit / guess datasets dset_name : String / Unicode Name of the dataset h5_group : :class:`h5py.Group`, optional. Default = None Group within which this dataset will be created new_attrs : dictionary (Optional) Any new attributes that need to be written to the dataset skip_refs : boolean, optional Should ObjectReferences be skipped when copying attributes from the `source_dset` Returns ------- h5_new_dset : h5py.Dataset object Newly created dataset """ if not isinstance(source_dset, h5py.Dataset): raise TypeError('source_deset should be a h5py.Dataset object') _ = validate_dtype(dtype) if new_attrs is not None: if not isinstance(new_attrs, dict): raise TypeError('new_attrs should be a dictionary') else: new_attrs = dict() if h5_group is None: h5_group = source_dset.parent else: if not isinstance(h5_group, (h5py.Group, h5py.File)): raise TypeError('h5_group should be a h5py.Group or h5py.File object') if source_dset.file != h5_group.file and not skip_refs: # Cannot carry over references warn('H5 object references will not be copied over since {} is in ' 'a different HDF5 file as {}'.format(h5_group, source_dset)) skip_refs = True dset_name = validate_single_string_arg(dset_name, 'dset_name') if '-' in dset_name: warn('dset_name should not contain the "-" character. Reformatted name from:{} to ' '{}'.format(dset_name, dset_name.replace('-', '_'))) dset_name = dset_name.replace('-', '_') kwargs = {'shape': source_dset.shape, 'dtype': dtype, 'compression': source_dset.compression, 'chunks': source_dset.chunks} if source_dset.file.driver == 'mpio': if kwargs.pop('compression', None) is not None: warn('This HDF5 file has been opened wth the "mpio" communicator. ' 'mpi4py does not allow creation of compressed datasets. Compression kwarg has been removed') if dset_name in h5_group.keys(): if isinstance(h5_group[dset_name], h5py.Dataset): warn('A dataset named: {} already exists in group: {}'.format(dset_name, h5_group.name)) h5_new_dset = h5_group[dset_name] # Make sure it has the correct shape and dtype if any((source_dset.shape != h5_new_dset.shape, dtype != h5_new_dset.dtype)): warn('Either the shape (existing: {} desired: {}) or dtype (existing: {} desired: {}) of the dataset ' 'did not match with expectations. Deleting and creating a new one.'.format(h5_new_dset.shape, source_dset.shape, h5_new_dset.dtype, dtype)) del h5_new_dset, h5_group[dset_name] h5_new_dset = h5_group.create_dataset(dset_name, **kwargs) else: raise KeyError('{} is already a {} in group: {}'.format(dset_name, type(h5_group[dset_name]), h5_group.name)) else: h5_new_dset = h5_group.create_dataset(dset_name, **kwargs) # This should link the ancillary datasets correctly h5_new_dset = hut.copy_attributes(source_dset, h5_new_dset, skip_refs=skip_refs) if source_dset.file != h5_group.file: hut.copy_linked_objects(source_dset, h5_new_dset) h5_new_dset.attrs.update(new_attrs) if check_if_main(h5_new_dset): from ..usi_data import USIDataset h5_new_dset = USIDataset(h5_new_dset) # update book keeping attributes write_book_keeping_attrs(h5_new_dset) return h5_new_dset
[docs] def check_for_matching_attrs(h5_obj, new_parms=None, verbose=False): """ Compares attributes in the given H5 object against those in the provided dictionary and returns True if the parameters match, and False otherwise Parameters ---------- h5_obj : h5py object (Dataset or :class:`h5py.Group`) Object whose attributes will be compared against new_parms new_parms : dict, optional. default = empty dictionary Parameters to compare against the attributes present in h5_obj verbose : bool, optional, default = False Whether or not to print debugging statements Returns ------- tests: bool Whether or not all paramters in new_parms matched with those in h5_obj's attributes """ if not isinstance(h5_obj, (h5py.Dataset, h5py.Group, h5py.File)): raise TypeError('h5_obj should be a h5py.Dataset, h5py.Group, or h5py.File object') if new_parms is None: new_parms = dict() else: if not isinstance(new_parms, dict): raise TypeError('new_parms should be a dictionary') tests = [] for key in new_parms.keys(): if verbose: print('Looking for new attribute named: {}'.format(key)) # HDF5 cannot store None as an attribute anyway. ignore if new_parms[key] is None: continue try: old_value = get_attr(h5_obj, key) except KeyError: # if parameter was not found assume that something has changed if verbose: print('New parm: {} \t- new parm not in group *****'.format(key)) tests.append(False) break if isinstance(old_value, np.ndarray): if not isinstance(new_parms[key], collections.Iterable): if verbose: print('New parm: {} \t- new parm not iterable unlike old parm *****'.format(key)) tests.append(False) break new_array = np.array(new_parms[key]) if old_value.size != new_array.size: if verbose: print('New parm: {} \t- are of different sizes ****'.format(key)) tests.append(False) else: try: answer = np.allclose(old_value, new_array) except TypeError: # comes here when comparing string arrays # Not sure of a better way answer = [] for old_val, new_val in zip(old_value, new_array): answer.append(old_val == new_val) answer = np.all(answer) if verbose: print('New parm: {} \t- match: {}'.format(key, answer)) tests.append(answer) else: """if isinstance(new_parms[key], collections.Iterable): if verbose: print('New parm: {} \t- new parm is iterable unlike old parm *****'.format(key)) tests.append(False) break""" answer = np.all(new_parms[key] == old_value) if verbose: print('New parm: {} \t- match: {}'.format(key, answer)) tests.append(answer) if verbose: print('') return all(tests)
[docs] def write_ind_val_dsets(h5_parent_group, dimensions, is_spectral=True, verbose=False, base_name=None, slow_to_fast=False): """ Creates h5py.Datasets for the position OR spectroscopic indices and values of the data. Remember that the contents of the dataset can be changed if need be after the creation of the datasets. For example if one of the spectroscopic dimensions (e.g. - Bias) was sinusoidal and not linear, The specific dimension in the Spectroscopic_Values dataset can be manually overwritten. Parameters ---------- h5_parent_group : :class:`h5py.Group` or :class:`h5py.File` Group under which the indices and values datasets will be created dimensions : Dimension or array-like of Dimension objects Sequence of Dimension objects that provides all necessary instructions for constructing the indices and values datasets is_spectral : bool, optional. default = True Spectroscopic (True) or Position (False) verbose : Boolean, optional Whether or not to print statements for debugging purposes base_name : str or unicode, optional Prefix for the datasets. Default: 'Position' when is_spectral is False, 'Spectroscopic' otherwise slow_to_fast : bool, Optional. Default=False Set to True if the dimensions are arranged from slowest varying to fastest varying. Set to False otherwise. Returns ------- h5_spec_inds : h5py.Dataset Dataset containing the position indices h5_spec_vals : h5py.Dataset Dataset containing the value at each position Notes ----- `steps`, `initial_values`, `labels`, and 'units' must be the same length as `dimensions` when they are specified. Dimensions should be in the order from fastest varying to slowest. """ if isinstance(dimensions, Dimension): dimensions = [dimensions] if not isinstance(dimensions, (list, np.ndarray, tuple)): raise TypeError('dimensions should be array-like ') if not np.all([isinstance(x, Dimension) for x in dimensions]): raise TypeError('dimensions should be a sequence of Dimension objects') if not isinstance(h5_parent_group, (h5py.Group, h5py.File)): raise TypeError('h5_parent_group should be a h5py.File or Group object') if not is_editable_h5(h5_parent_group): raise ValueError('The provided h5 object is not valid / open') if base_name is not None: base_name = validate_single_string_arg(base_name, 'base_name') if not base_name.endswith('_'): base_name += '_' else: base_name = 'Position_' if is_spectral: base_name = 'Spectroscopic_' if not slow_to_fast: warn('In the future write_ind_val_dsets will default to requiring dimensions to be arranged from slowest to fastest varying') # check if the datasets already exist. If they do, there's no point in going any further for sub_name in ['Indices', 'Values']: if base_name + sub_name in h5_parent_group.keys(): raise KeyError('Dataset: {} already exists in provided group: {}'.format(base_name + sub_name, h5_parent_group.name)) modes = [dim.mode for dim in dimensions] sing_mode = np.unique(modes) if sing_mode.size > 1: raise NotImplementedError('Cannot yet work on combinations of modes for Dimensions. Consider doing manually') sing_mode = sing_mode[0] if sing_mode == DimType.DEFAULT: if slow_to_fast: # Ensure that the dimensions are arranged from fast to slow instead dimensions = dimensions[::-1] indices, values = build_ind_val_matrices([dim.values for dim in dimensions], is_spectral=is_spectral) # At this point, dimensions and unit values are arranged from fastest to slowest # We want dimensions to be arranged from slowest to fastest: rev_func = np.flipud if is_spectral else np.fliplr dimensions = dimensions[::-1] indices = rev_func(indices) values = rev_func(values) elif sing_mode == DimType.INCOMPLETE: lengths = np.unique([len(dim.values) for dim in dimensions]) if len(lengths) > 1: raise ValueError('Values for dimensions not of same length') single_dim = np.arange(lengths[0], dtype=INDICES_DTYPE) indices = np.tile(single_dim, (2, 1)).T values = np.dstack(tuple([dim.values for dim in dimensions])).squeeze() if is_spectral: indices = indices.T values = values.T else: raise NotImplementedError('Cannot yet work on Dependent dimensions') if verbose: print('Indices:') print(indices) print('Values:') print(values) # Create the Datasets for both Indices and Values h5_indices = h5_parent_group.create_dataset(base_name + 'Indices', data=INDICES_DTYPE(indices), dtype=INDICES_DTYPE) h5_values = h5_parent_group.create_dataset(base_name + 'Values', data=VALUES_DTYPE(values), dtype=VALUES_DTYPE) for h5_dset in [h5_indices, h5_values]: write_simple_attrs(h5_dset, {'units': [x.units for x in dimensions], 'labels': [x.name for x in dimensions], 'type': [dim.mode.value for dim in dimensions]}) warn('pyUSID.io.hdf_utils.simple.write_ind_val_dsets no longer creates' 'region references for each dimension. Please use ' 'pyUSID.io.reg_ref.write_region_references to manually create region ' 'references') return h5_indices, h5_values
[docs] def write_reduced_anc_dsets(h5_parent_group, h5_inds, h5_vals, dim_name, basename=None, is_spec=None, verbose=False): """ Creates new Ancillary Indices and Values datasets from the input datasets by dropping the specified dimensions Parameters ---------- h5_parent_group : :class:`h5py.Group` or h5py.File Group under which the indices and values datasets will be created h5_inds : HDF5 Dataset Spectroscopic or Positions indices dataset h5_vals : HDF5 Dataset Spectroscopic or Positions values dataset dim_name : str or unicode or list of strings Names of the dimension(s) to remove basename : str or unicode, Optional String to which '_Indices' and '_Values' will be appended to get the names of the new datasets. Default = 'Position' or 'Spectroscopic' is_spec : bool, optional Whether or not the provided ancillary datasets are position or spectroscopic The user is recommended to supply this parameter whenever it is known or possible. By default, this function will attempt to recognize the answer based on the shape of the datasets. verbose : bool, optional. Default = False Whether or not to print debugging print statements Returns ------- h5_inds_new : h5py.Dataset Reduced indices dataset h5_vals_new : h5py.Dataset Reduces values dataset """ if not isinstance(h5_parent_group, (h5py.Group, h5py.File)): raise TypeError('h5_parent_group should either be a h5py. Group or File object') for param, param_name in zip([h5_inds, h5_vals], ['h5_inds', 'h5_vals']): if not isinstance(param, h5py.Dataset): raise TypeError(param_name + ' should be a h5py.Dataset object') if dim_name is not None: dim_name = validate_list_of_strings(dim_name, 'dim_name') all_dim_names = list(get_attr(h5_inds, 'labels')) for item in dim_name: if item not in all_dim_names: raise KeyError('Requested dimension: {} not in the list of labels: {}'.format(item, all_dim_names)) ind_mat = h5_inds[()] val_mat = h5_vals[()] if is_spec is None: # Attempt to recognize the type automatically is_spec = False if ind_mat.shape[0] == ind_mat.shape[1]: raise ValueError('Unable automatically guess whether the provided datasets are position or ' 'spectroscopic. Please explicitely specify via the "is_spec" boolean kwarg') if ind_mat.shape[0] < ind_mat.shape[1]: is_spec = True else: if not isinstance(is_spec, bool): raise TypeError('is_spec should be a boolean. Provided object is of type: {}'.format(type(is_spec))) if basename is not None: basename = validate_single_string_arg(basename, 'basename') if basename.endswith('_'): basename = basename[:-1] else: if is_spec: basename = 'Spectroscopic' else: basename = 'Position' for sub_name in ['_Indices', '_Values']: if basename + sub_name in h5_parent_group.keys(): raise KeyError('Dataset: {} already exists in provided group: {}'.format(basename + sub_name, h5_parent_group.name)) if set(dim_name) != set(all_dim_names): # At least one dimension will remain if verbose: print('All Dimensions: {}. Dimensions to be removed: {}'.format(all_dim_names, dim_name)) if not is_spec: # Convert to spectral shape ind_mat = np.transpose(ind_mat) val_mat = np.transpose(val_mat) # For all dimensions, find where the index = 0 # basically, we are indexing all dimensions to 0 first_indices = [] keep_dim = np.ones(len(all_dim_names), dtype=bool) for cur_dim in dim_name: dim_ind = all_dim_names.index(cur_dim) keep_dim[dim_ind] = False # check equality against the minimum value instead of 0 to account for cases when a dimension does not start # from 0 (already been sliced) - think of multi-dimensional slicing! first_indices.append(ind_mat[dim_ind] == np.min(ind_mat[dim_ind])) first_indices = np.vstack(first_indices) if verbose: print('Raw first_indices:') print(first_indices) print('Dimensions to keep: {}'.format(keep_dim)) step_starts = np.all(first_indices, axis=0) if verbose: print('Columns in dataset to keep:') print(step_starts) ''' Extract all rows that we want to keep from input indices and values ''' # TODO: handle TypeError: Indexing elements must be in increasing order ind_mat = ind_mat[keep_dim, :][:, step_starts] val_mat = val_mat[keep_dim, :][:, step_starts] if not is_spec: # Convert back to position shape ind_mat = np.transpose(ind_mat) val_mat = np.transpose(val_mat) ''' Create new Datasets to hold the data Name them based on basename ''' h5_inds_new = h5_parent_group.create_dataset(basename + '_Indices', data=ind_mat, dtype=h5_inds.dtype) h5_vals_new = h5_parent_group.create_dataset(basename + '_Values', data=val_mat, dtype=h5_vals.dtype) # Extracting the labels from the original spectroscopic data sets labels = h5_inds.attrs['labels'][keep_dim] # Creating the dimension slices for the new spectroscopic data sets # Adding the labels and units to the new spectroscopic data sets for dset in [h5_inds_new, h5_vals_new]: write_simple_attrs(dset, {'labels': labels, 'units': h5_inds.attrs['units'][keep_dim]}) else: # Remove all dimensions: h5_inds_new = h5_parent_group.create_dataset(basename + '_Indices', data=np.array([[0]]), dtype=INDICES_DTYPE) h5_vals_new = h5_parent_group.create_dataset(basename + '_Values', data=np.array([[0]]), dtype=VALUES_DTYPE) for dset in [h5_inds_new, h5_vals_new]: write_simple_attrs(dset, {'labels': ['Single_Step'], 'units': ['a. u.']}) return h5_inds_new, h5_vals_new