Source code for sidpy.hdf.prov_utils

# -*- coding: utf-8 -*-
"""
Tools for tracking provenance within HDF5 files

Created on Tue Nov  3 21:14:25 2015

@author: Suhas Somnath, Chris Smith
"""
from __future__ import division, print_function, absolute_import, \
    unicode_literals
import sys
from warnings import warn
import h5py
import numpy as np

if sys.version_info.major == 3:
    from collections.abc import Iterable
    unicode = str
else:
    from collections import Iterable

from sidpy.base.string_utils import validate_single_string_arg
from sidpy.hdf.hdf_utils import get_attr, write_book_keeping_attrs, \
    write_simple_attrs


[docs] def assign_group_index(h5_parent_group, base_name, verbose=False): """ Searches the parent h5 group to find the next available index for the group Parameters ---------- h5_parent_group : :class:`h5py.Group` object Parent group under which the new group object will be created base_name : str or unicode Base name of the new group without index verbose : bool, optional. Default=False Whether or not to print debugging statements Returns ------- base_name : str or unicode Base name of the new group with the next available index as a suffix """ if not isinstance(h5_parent_group, h5py.Group): raise TypeError('h5_parent_group should be a h5py.Group object') base_name = validate_single_string_arg(base_name, 'base_name') if len(base_name) == 0: raise ValueError('base_name should not be an empty string') if not base_name.endswith('_'): base_name += '_' temp = [key for key in h5_parent_group.keys()] if verbose: print('Looking for group names starting with {} in parent containing items: ' '{}'.format(base_name, temp)) previous_indices = [] for item_name in temp: if isinstance(h5_parent_group[item_name], h5py.Group) and item_name.startswith(base_name): previous_indices.append(int(item_name.replace(base_name, ''))) previous_indices = np.sort(previous_indices) if verbose: print('indices of existing groups with the same prefix: {}'.format(previous_indices)) if len(previous_indices) == 0: index = 0 else: index = previous_indices[-1] + 1 return base_name + '{:03d}'.format(index)
[docs] def create_indexed_group(h5_parent_group, base_name): """ Creates a group with an indexed name (eg - 'Measurement_012') under ``h5_parent_group`` using the provided ``base_name`` as a prefix for the group's name Parameters ---------- h5_parent_group : :class:`h5py.Group` or :class:`h5py.File` File or group within which the new group will be created base_name : str or unicode Prefix for the group name. This need not end with a '_'. It will be added automatically """ if not isinstance(h5_parent_group, (h5py.Group, h5py.File)): raise TypeError('h5_parent_group should be a h5py.File or Group object') base_name = validate_single_string_arg(base_name, 'base_name') group_name = assign_group_index(h5_parent_group, base_name) h5_new_group = h5_parent_group.create_group(group_name) write_book_keeping_attrs(h5_new_group) return h5_new_group
[docs] def create_results_group(h5_main, tool_name, h5_parent_group=None): """ Creates a h5py.Group object auto-indexed and named as 'DatasetName-ToolName_00x' Parameters ---------- h5_main : h5py.Dataset object Reference to the dataset based on which the process / analysis is being performed tool_name : string / unicode Name of the Process / Analysis applied to h5_main h5_parent_group : h5py.Group, optional. Default = None Parent group under which the results group will be created. Use this option to write results into a new HDF5 file. By default, results will be written into the same group containing `h5_main` Returns ------- h5_group : :class:`h5py.Group` Results group which can now house the results datasets """ # TODO: Revise significantly. Avoid parent dataset name # Consider embedding refs to source datasets as attributes of group warn('The behavior of create_results_group is very likely to change soon ' 'and significantly. Use this function with caution', FutureWarning) if not isinstance(h5_main, h5py.Dataset): raise TypeError('h5_main should be a h5py.Dataset object') if h5_parent_group is not None: if not isinstance(h5_parent_group, (h5py.File, h5py.Group)): raise TypeError("'h5_parent_group' should either be a h5py.File " "or h5py.Group object") else: h5_parent_group = h5_main.parent tool_name = validate_single_string_arg(tool_name, 'tool_name') if '-' in tool_name: warn('tool_name should not contain the "-" character. Reformatted name from:{} to ' '{}'.format(tool_name, tool_name.replace('-', '_'))) tool_name = tool_name.replace('-', '_') group_name = h5_main.name.split('/')[-1] + '-' + tool_name + '_' group_name = assign_group_index(h5_parent_group, group_name) h5_group = h5_parent_group.create_group(group_name) write_book_keeping_attrs(h5_group) # Also add some basic attributes like source and tool name. This will allow relaxation of nomenclature restrictions: # this are NOT being used right now but will be in the subsequent versions of pyNSID write_simple_attrs(h5_group, {'tool': tool_name, 'num_source_dsets': 1}) # in this case, there is only one source if h5_parent_group.file == h5_main.file: for dset_ind, dset in enumerate([h5_main]): h5_group.attrs['source_' + '{:03d}'.format(dset_ind)] = dset.ref return h5_group
[docs] def find_results_groups(h5_main, tool_name, h5_parent_group=None): """ Finds a list of all groups containing results of the process of name ``tool_name`` being applied to the dataset Parameters ---------- h5_main : h5 dataset reference Reference to the target dataset to which the tool was applied tool_name : String / unicode Name of the tool applied to the target dataset h5_parent_group : h5py.Group, optional. Default = None Parent group under which the results group will be searched for. Use this option when the results groups are contained in different HDF5 file compared to `h5_main`. BY default, this function will search within the same group that contains `h5_main` Returns ------- groups : list of references to :class:`h5py.Group` objects groups whose name contains the tool name and the dataset name """ warn('The behavior of find_results_group is very likely to change soon ' 'and significantly. Use this function with caution', FutureWarning) if not isinstance(h5_main, h5py.Dataset): raise TypeError('h5_main should be a h5py.Dataset object') tool_name = validate_single_string_arg(tool_name, 'tool_name') if h5_parent_group is not None: if not isinstance(h5_parent_group, (h5py.File, h5py.Group)): raise TypeError("'h5_parent_group' should either be a h5py.File " "or h5py.Group object") else: h5_parent_group = h5_main.parent dset_name = h5_main.name.split('/')[-1] groups = [] for key in h5_parent_group.keys(): if dset_name in key and tool_name in key and isinstance(h5_parent_group[key], h5py.Group): groups.append(h5_parent_group[key]) return groups
[docs] def check_for_old(h5_base, tool_name, new_parms=None, target_dset=None, h5_parent_goup=None, verbose=False): """ Check to see if the results of a tool already exist and if they were performed with the same parameters. Parameters ---------- h5_base : h5py.Dataset object Dataset on which the tool is being applied to tool_name : str process or analysis name new_parms : dict, optional Parameters with which this tool will be performed. target_dset : str, optional, default = None Name of the dataset whose attributes will be compared against new_parms. Default - checking against the group h5_parent_goup : h5py.Group, optional. Default = None The group to search under. Use this option when `h5_base` and the potential results groups (within `h5_parent_goup` are located in different HDF5 files. Default - search within h5_base.parent verbose : bool, optional, default = False Whether or not to print debugging statements Returns ------- group : list List of all :class:`h5py.Group` objects with parameters matching those in `new_parms` """ warn('The behavior of check_for_old is very likely to change soon ' '. Use this function with caution', FutureWarning) if not isinstance(h5_base, h5py.Dataset): raise TypeError('h5_base should be a h5py.Dataset object') tool_name = validate_single_string_arg(tool_name, 'tool_name') if h5_parent_goup is not None: if not isinstance(h5_parent_goup, (h5py.File, h5py.Group)): raise TypeError("'h5_parent_group' should either be a h5py.File " "or h5py.Group object") else: h5_parent_goup = h5_base.parent if new_parms is None: new_parms = dict() else: if not isinstance(new_parms, dict): raise TypeError('new_parms should be a dict') if target_dset is not None: target_dset = validate_single_string_arg(target_dset, 'target_dset') matching_groups = [] groups = find_results_groups(h5_base, tool_name, h5_parent_group=h5_parent_goup) for group in groups: if verbose: print('Looking at group - {}'.format(group.name.split('/')[-1])) h5_obj = group if target_dset is not None: if target_dset in group.keys(): h5_obj = group[target_dset] else: if verbose: print('{} did not contain the target dataset: {}'.format(group.name.split('/')[-1], target_dset)) continue if check_for_matching_attrs(h5_obj, new_parms=new_parms, verbose=verbose): # return group matching_groups.append(group) return matching_groups
[docs] def check_for_matching_attrs(h5_obj, new_parms=None, verbose=False): """ Compares attributes in the given H5 object against those in the provided dictionary and returns True if the parameters match, and False otherwise Parameters ---------- h5_obj : h5py object (Dataset or :class:`h5py.Group`) Object whose attributes will be compared against ``new_parms`` new_parms : dict, optional. default = empty dictionary Parameters to compare against the attributes present in h5_obj verbose : bool, optional, default = False Whether or not to print debugging statements Returns ------- tests: bool Whether or not all paramters in new_parms matched with those in h5_obj's attributes """ if not isinstance(h5_obj, (h5py.Dataset, h5py.Group, h5py.File)): raise TypeError('h5_obj should be a h5py.Dataset, h5py.Group, or h5py.File object') if new_parms is None: new_parms = dict() else: if not isinstance(new_parms, dict): raise TypeError('new_parms should be a dictionary') tests = [] for key in new_parms.keys(): if verbose: print('Looking for new attribute named: {}'.format(key)) # HDF5 cannot store None as an attribute anyway. ignore if new_parms[key] is None: continue try: old_value = get_attr(h5_obj, key) except KeyError: # if parameter was not found assume that something has changed if verbose: print('New parm: {} \t- new parm not in group *****'.format(key)) tests.append(False) break if isinstance(old_value, np.ndarray): if not isinstance(new_parms[key], Iterable): if verbose: print('New parm: {} \t- new parm not iterable unlike old parm *****'.format(key)) tests.append(False) break new_array = np.array(new_parms[key]) if old_value.size != new_array.size: if verbose: print('New parm: {} \t- are of different sizes ****'.format(key)) tests.append(False) else: try: answer = np.allclose(old_value, new_array) except TypeError: # comes here when comparing string arrays # Not sure of a better way answer = [] for old_val, new_val in zip(old_value, new_array): answer.append(old_val == new_val) answer = np.all(answer) if verbose: print('New parm: {} \t- match: {}'.format(key, answer)) tests.append(answer) else: """if isinstance(new_parms[key], collections.Iterable): if verbose: print('New parm: {} \t- new parm is iterable unlike old parm *****'.format(key)) tests.append(False) break""" answer = np.all(new_parms[key] == old_value) if verbose: print('New parm: {} \t- match: {}'.format(key, answer)) tests.append(answer) if verbose: print('') return all(tests)
[docs] def get_source_dataset(h5_group): """ Find the name of the source dataset used to create the input `h5_group`, so long as the source dataset is in the same HDF5 file Parameters ---------- h5_group : :class:`h5py.Group` Child group whose source dataset will be returned Returns ------- h5_source : NSIDataset object Main dataset from which this group was generated """ if not isinstance(h5_group, h5py.Group): raise TypeError('h5_group should be a h5py.Group object') h5_parent_group = h5_group.parent group_name = h5_group.name.split('/')[-1] # What if the group name was not formatted according to Pycroscopy rules? name_split = group_name.split('-') if len(name_split) != 2: raise ValueError("The provided group's name could not be split by '-' as expected in " "SourceDataset-ProcessName_000") h5_source = h5_parent_group[name_split[0]] if not isinstance(h5_source, h5py.Dataset): raise ValueError('Source object was not a dataset!') return h5_source