Source code for pyUSID.io.anc_build_utils

# -*- coding: utf-8 -*-
"""
Utilities that assist in building ancillary USID datasets manually.
Formerly known as "write_utils"

Created on Thu Sep  7 21:14:25 2017

@author: Suhas Somnath, Chris Smith
"""

from __future__ import division, print_function, unicode_literals, absolute_import
import sys
import numpy as np
from sidpy.base.num_utils import contains_integers
from sidpy.base.string_utils import validate_list_of_strings
# For legacy reasons
from .dimension import Dimension, DimType, validate_dimensions

__all__ = ['get_aux_dset_slicing', 'make_indices_matrix', 'INDICES_DTYPE',
           'VALUES_DTYPE', 'build_ind_val_matrices', 'calc_chunks',
           'create_spec_inds_from_vals',
           'Dimension', 'DimType', 'validate_dimensions']

if sys.version_info.major == 3:
    unicode = str

# Constants:
INDICES_DTYPE = np.uint32
VALUES_DTYPE = np.float32


[docs] def get_aux_dset_slicing(dim_names, last_ind=None, is_spectroscopic=False): """ Returns a dictionary of slice objects to help in creating region references in the position or spectroscopic indices and values datasets Parameters ------------ dim_names : iterable List of strings denoting the names of the position axes or spectroscopic dimensions arranged in the same order that matches the dimensions in the indices / values dataset last_ind : (Optional) unsigned int, default = None Last pixel in the positon or spectroscopic matrix. Useful in experiments where the parameters have changed (eg. BEPS new data format) during the experiment. is_spectroscopic : bool, optional. default = True set to True for position datasets and False for spectroscopic datasets Returns ------------ slice_dict : dictionary Dictionary of tuples containing slice objects corresponding to each position axis. """ dim_names = validate_list_of_strings(dim_names, 'dim_names') if len(dim_names) == 0: raise ValueError('No valid dim_names provided') slice_dict = dict() for spat_ind, curr_dim_name in enumerate(dim_names): val = (slice(last_ind), slice(spat_ind, spat_ind + 1)) if is_spectroscopic: val = val[::-1] slice_dict[str(curr_dim_name)] = val return slice_dict
[docs] def make_indices_matrix(num_steps, is_position=True): """ Makes an ancillary indices matrix given the number of steps in each dimension. In other words, this function builds a matrix whose rows correspond to unique combinations of the multiple dimensions provided. Parameters ------------ num_steps : List / numpy array / int Number of steps in each spatial or spectral dimension Note that the axes must be ordered from fastest varying to slowest varying is_position : bool, optional, default = True Whether the returned matrix is meant for position (True) indices (tall and skinny) or spectroscopic (False) indices (short and wide) Returns -------------- indices_matrix : 2D unsigned int numpy array arranged as [steps, spatial dimension] """ if isinstance(num_steps, int): num_steps = [num_steps] if not isinstance(num_steps, (tuple, list, np.ndarray)): raise TypeError('num_steps should be a list / tuple / numpy array') if isinstance(num_steps, np.ndarray) and num_steps.ndim < 1: num_steps = np.expand_dims(num_steps, 0) if len(num_steps) == 0: raise ValueError('num_steps should not be an empty array or list') if len(num_steps) == 1 and num_steps[0] == 1: num_steps = [1] elif not contains_integers(num_steps, min_val=1 + int(len(num_steps) > 0)): raise ValueError('num_steps should contain integers greater than equal' ' to 1 (empty dimension) or 2') num_steps = np.array(num_steps) spat_dims = max(1, len(np.where(num_steps > 1)[0])) indices_matrix = np.zeros(shape=(np.prod(num_steps), spat_dims), dtype=INDICES_DTYPE) dim_ind = 0 for indx, curr_steps in enumerate(num_steps): if curr_steps > 1: part1 = np.prod(num_steps[:indx + 1]) if indx > 0: part2 = np.prod(num_steps[:indx]) else: part2 = 1 if indx + 1 == len(num_steps): part3 = 1 else: part3 = np.prod(num_steps[indx + 1:]) indices_matrix[:, dim_ind] = np.tile(np.floor(np.arange(part1) / part2), part3) dim_ind += 1 if not is_position: indices_matrix = indices_matrix.T return indices_matrix
[docs] def build_ind_val_matrices(unit_values, is_spectral=True): """ Builds indices and values matrices using given unit values for each dimension. Unit values must be arranged from fastest varying to slowest varying Parameters ---------- unit_values : list / tuple Sequence of values vectors for each dimension is_spectral : bool (optional), default = True If true, returns matrices for spectroscopic datasets, else returns matrices for Position datasets Returns ------- ind_mat : 2D numpy array Indices matrix val_mat : 2D numpy array Values matrix """ if not isinstance(unit_values, (list, tuple)): raise TypeError('unit_values should be a list or tuple') if not np.all([np.array(x).ndim == 1 for x in unit_values]): raise ValueError('unit_values should only contain 1D array') lengths = [len(x) for x in unit_values] tile_size = [np.prod(lengths[x:]) for x in range(1, len(lengths))] + [1] rep_size = [1] + [np.prod(lengths[:x]) for x in range(1, len(lengths))] val_mat = np.zeros(shape=(len(lengths), np.prod(lengths))) ind_mat = np.zeros(shape=val_mat.shape, dtype=np.uint32) for ind, ts, rs, vec in zip(range(len(lengths)), tile_size, rep_size, unit_values): val_mat[ind] = np.tile(np.repeat(vec, rs), ts) ind_mat[ind] = np.tile(np.repeat(np.arange(len(vec)), rs), ts) if not is_spectral: val_mat = val_mat.T ind_mat = ind_mat.T return INDICES_DTYPE(ind_mat), VALUES_DTYPE(val_mat)
[docs] def create_spec_inds_from_vals(ds_spec_val_mat): """ Create new Spectroscopic Indices table from the changes in the Spectroscopic Values Parameters ---------- ds_spec_val_mat : array-like, Holds the spectroscopic values to be indexed Returns ------- ds_spec_inds_mat : numpy array of uints the same shape as ds_spec_val_mat Indices corresponding to the values in ds_spec_val_mat """ if not isinstance(ds_spec_val_mat, np.ndarray): raise TypeError('ds_spec_val_mat must be a numpy array') if ds_spec_val_mat.ndim != 2: raise ValueError('ds_spec_val_mat must be a 2D array arranged as [dimension, values]') ds_spec_inds_mat = np.zeros_like(ds_spec_val_mat, dtype=np.int32) """ Find how quickly the spectroscopic values are changing in each row and the order of row from fastest changing to slowest. """ change_count = [len(np.where([row[i] != row[i - 1] for i in range(len(row))])[0]) for row in ds_spec_val_mat] change_sort = np.argsort(change_count)[::-1] """ Determine everywhere the spectroscopic values change and build index table based on those changed """ indices = np.zeros(ds_spec_val_mat.shape[0]) for jcol in range(1, ds_spec_val_mat.shape[1]): this_col = ds_spec_val_mat[change_sort, jcol] last_col = ds_spec_val_mat[change_sort, jcol - 1] """ Check if current column values are different than those in last column. """ changed = np.where(this_col != last_col)[0] """ If only one row changed, increment the index for that column If more than one row has changed, increment the index for the last row that changed and set all others to zero """ if len(changed) == 1: indices[changed] += 1 elif len(changed > 1): for change in changed[:-1]: indices[change] = 0 indices[changed[-1]] += 1 """ Store the indices for the current column in the dataset """ ds_spec_inds_mat[change_sort, jcol] = indices return ds_spec_inds_mat
[docs] def calc_chunks(dimensions, dtype_byte_size, unit_chunks=None, max_chunk_mem=10240): """ Calculate the chunk size for the HDF5 dataset based on the dimensions and the maximum chunk size in memory Parameters ---------- dimensions : array_like of int Shape of the data to be chunked dtype_byte_size : unsigned int Size of an entry in the data in bytes unit_chunks : array_like of int, optional Unit size of the chunking in each dimension. Must be the same size as the shape of `ds_main`. Default None, `unit_chunks` is set to 1 in all dimensions max_chunk_mem : int, optional Maximum size of the chunk in memory in bytes. Default 10240b or 10kb per h5py recommendations Returns ------- chunking : tuple of int Calculated maximum size of a chunk in each dimension that is as close to the requested `max_chunk_mem` as posible while having steps based on the input `unit_chunks`. """ if not isinstance(dimensions, (list, tuple)): raise TypeError('dimensions should either be a tuple or list') if not isinstance(dtype_byte_size, int): raise TypeError('dtype_byte_size should be an integer') if unit_chunks is not None: if not isinstance(unit_chunks, (tuple, list)): raise TypeError('unit_chunks should either be a tuple or list') ''' Ensure that dimensions is an array ''' dimensions = np.asarray(dimensions, dtype=np.uint) ''' Set the unit_chunks to all ones if not given. Ensure it is an array if it is. ''' if unit_chunks is None: unit_chunks = np.ones_like(dimensions) else: unit_chunks = np.asarray(unit_chunks, dtype=np.uint) if unit_chunks.shape != dimensions.shape: raise ValueError('Unit chunk size must have the same shape as the input dataset.') ''' Save the original size of unit_chunks to use for incrementing the chunk size during loop ''' base_chunks = unit_chunks.copy() ''' Loop until chunk_size is greater than the maximum chunk_mem or the chunk_size is equal to that of dimensions ''' while np.prod(unit_chunks) * dtype_byte_size <= max_chunk_mem: ''' Check if all chunk dimensions are greater or equal to the actual dimensions. Exit the loop if true. ''' if np.all(unit_chunks >= dimensions): break ''' Find the index of the next chunk to be increased and increment it by the base_chunk size ''' ichunk = np.argmax(dimensions / unit_chunks) unit_chunks[ichunk] += base_chunks[ichunk] ''' Ensure that the size of the chunks is between one and the dimension size. ''' unit_chunks = np.clip(unit_chunks, np.ones_like(unit_chunks), dimensions) chunking = tuple(unit_chunks) return chunking