{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# Formalizing Data Processing using the Process Class\n", "\n", "**Suhas Somnath, Oak Ridge National Lab**\n", "\n", "**Rajiv Giridharagopal, University of Washington**\n", "\n", "Updated: 6/12/2020\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**In this example, we will learn how to implement the pyUSID Process class. This method is \n", "ideal for situations where we want to parallel operate on a large dataset.**\n", "\n", "## Introduction\n", "Most of code written for scientific research is in the form of single-use / one-off scripts due to a few common\n", "reasons:\n", "\n", "* the author feels that it is the fastest mode to accomplishing a research task\n", "* the author feels that they are unlikely to perform the same operation again\n", "* the author does not anticipate the possibility that others may need to run their code\n", "\n", "However, more often than not, nearly all researchers have found that one or more of these assumptions fail and a lot\n", "of time is spent on fixing bugs and generalizing / formalizing code such that it can be shared or reused. Moreover, we\n", "live in an era of open science where the scientific community and an ever-increasing number of scientific journals\n", "are moving towards a paradigm where the data and code need to be made available with journal papers. Therefore, in the\n", "interest of saving time, energy, and reputation, it makes a lot more sense to formalize (parts of) one's data analysis\n", "code.\n", "\n", "For many researchers, formalizing data processing or analysis may seem like a daunting task due to the complexity of\n", "and the number of sub-operations that need to performed. ``pyUSID.Process`` greatly simplifies the process of\n", "formalizing code by lifting or reducing the burden of implementing important, yet tedious tasks and considerations\n", "such as:\n", "\n", "* **memory management** - reading chunks of datasets that can be processed with the available memory, something very\n", " crucial for very large datasets that cannot entirely fit into the computer's memory\n", "* **Scalable parallel computing** -\n", "\n", " * On personal computers - considerate CPU usage - Process will use all but one or two CPU cores for the (parallel)\n", " computation, which allows the user to continue using the computer for other activities such as reading mail, etc.\n", " * New in pyUSID v. 0.0.5 - Ability to scale to multiple computers in a cluster. The Process class can scale the same\n", " scientific code written for personal computers to use multiple computers (or nodes) on a high performance\n", " computing (HPC) resource or a cloud-based cluster to dramatically reduce the computational time\n", "* **pausing and resuming computation** - interrupting and resuming the computation at a more convenient time,\n", " something that is especially valuable for lengthy computations.\n", "* **avoiding repeated computation and returning existing results** - pyUSID.Process will return existing results\n", " computed using the exact same parameters instead of re-computing and storing duplicate copies of the same results.\n", "* **testing before computation** - checking the processing / analysis on a single unit (typically a single pixel) of\n", " data before the entire data is processed. This is particularly useful for lengthy computations.\n", "\n", "Using ``pyUSID.Process``, the user only needs to address the following basic operations:\n", "\n", "1. Reading data from file\n", "2. Computation on a single unit of data\n", "3. Writing results to disk\n", "\n", "### Components of pyUSID.Process\n", "The most important functions in the Process class are:\n", "\n", "* ``__init__()`` - instantiates a 'Process' object of this class after validating the inputs.\n", "* ``_create_results_datasets()`` - creates the HDF5 datasets and Group(s) to store the results.\n", "* ``_map_function()`` - the operation that will per be performed on each element in the dataset.\n", "* ``test()`` - This simple function lets the user test the ``map_function`` on a unit of data (a single pixel\n", " typically) to see if it returns the desired results. It saves a lot of computational time by allowing the user to\n", " spot-check results before computing on the entire dataset\n", "* ``_read_data_chunk()`` - reads the input data from one or more datasets.\n", "* ``_write_results_chunk()`` - writes the computed results back to the file\n", "* ``_unit_computation()`` - Defines how to process a chunk (multiple units) of data. This allows room for\n", " pre-processing of input data and post-processing of results if necessary. If neither are required, this function\n", " essentially applies the parallel computation on ``_map_function()``.\n", "* ``compute()`` - this does the bulk of the work of (iteratively) reading a chunk of data >> processing in parallel\n", " via ``_unit_computation()`` >> calling ``_write_results_chunk()`` to write data. Most sub-classes, including the one\n", " below, do not need to extend / modify this function.\n", "\n", "See the \"Flow of Functions\" section near the bottom for a bit more detail.\n", "\n", "### Recommended pre-requisite reading\n", "* [Universal Spectroscopic and Imaging Data (USID) model](https://pycroscopy.github.io/USID/usid_model.html)\n", "* [Crash course on HDF5 and h5py](./h5py_primer.html)\n", "* Utilities for [reading](./hdf_utils_read.html) and [writing](./plot_hdf_utils_write.html)\n", " h5USID files using pyUSID\n", "* Crash course on [parallel processing](https://pycroscopy.github.io/sidpy/notebooks/01_parallel_computing/parallel_compute.html)\n", "\n", "### Example problem\n", "We will be working with a Band Excitation Piezoresponse Force Microscopy (BE-PFM) imaging dataset\n", "acquired from advanced atomic force microscopes. In this dataset, a spectra was collected for each position in a two\n", "dimensional grid of spatial locations. Thus, this is a three dimensional dataset that has been flattened to a two\n", "dimensional matrix in accordance with the USID model.\n", "\n", "This example is based on the parallel computing primer where we searched for the peak of each spectra in a dataset.\n", "While that example focused on comparing serial and parallel computing, we will focus on demonstrating the simplicity\n", "with which such a data analysis algorithm can be formalized.\n", "\n", "This example is a simplification of the pycroscopy.analysis.BESHOFitter class in our sister project - Pycroscopy." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import necessary packages" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "//anaconda/lib/python3.5/site-packages/pyUSID/viz/__init__.py:18: FutureWarning: Please use sidpy.viz.plot_utils instead of pyUSID.viz.plot_utils. pyUSID.plot_utils will be removed in a future release of pyUSID\n", " FutureWarning)\n" ] } ], "source": [ "from __future__ import division, print_function, absolute_import, unicode_literals\n", "\n", "# The package for accessing files in directories, etc.:\n", "import os\n", "\n", "# Warning package in case something goes wrong\n", "from warnings import warn\n", "import subprocess\n", "import sys\n", "\n", "def install(package):\n", " subprocess.call([sys.executable, \"-m\", \"pip\", \"install\", package])\n", "# Package for downloading online files:\n", "try:\n", " # This package is not part of anaconda and may need to be installed.\n", " import wget\n", "except ImportError:\n", " warn('wget not found. Will install with pip.')\n", " import pip\n", " install('wget')\n", " import wget\n", "\n", "# The mathematical computation package:\n", "import numpy as np\n", "\n", "# The package used for creating and manipulating HDF5 files:\n", "import h5py\n", "\n", "# Packages for plotting:\n", "import matplotlib.pyplot as plt\n", "\n", "# import sidpy - supporting package for pyUSID:\n", "try:\n", " import sidpy\n", "except ImportError:\n", " warn('sidpy not found. Will install with pip.')\n", " import pip\n", " install('sidpy')\n", " import sidpy\n", "\n", "# Finally import pyUSID:\n", "try:\n", " import pyUSID as usid\n", "except ImportError:\n", " warn('pyUSID not found. Will install with pip.')\n", " import pip\n", " install('pyUSID')\n", " import pyUSID as usid" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the fitting function as a separate file" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "peak_func_file = 'peak_finding.py'\n", "url = 'https://raw.githubusercontent.com/pycroscopy/pyUSID/master/notebooks/user_guide/supporting_docs/peak_finding.py'\n", "if os.path.exists(peak_func_file):\n", " os.remove(peak_func_file)\n", "_ = wget.download(url, peak_func_file, bar=None)\n", "# the scientific function\n", "from peak_finding import find_all_peaks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The goal is to **find the amplitude at the peak in each spectra**. Clearly, the operation of finding the peak in one\n", "spectra is independent of the same operation on another spectra. Thus, we could divide the dataset in to N parts and\n", "use N CPU cores to compute the results much faster than it would take a single core to compute the results. Such\n", "problems are ideally suited for making use of all the advanced functionalities in the Process class.\n", "\n", "## Defining the class\n", "In order to solve our problem, we would need to implement a ``sub-class`` of pyUSID.Process or in other words -\n", "**extend pyUSID.Process**. As mentioned above, the pyUSID.Process class already generalizes several important\n", "components of data processing. We only need to extend this class by implementing the science-specific functionality.\n", "The rest of the capabilities will be **inherited** from pyUSID.Process.\n", "\n", "Lets think about what operations need be performed for each of the core Process functions listed above.\n", "\n", "### map_function()\n", "The most important component in our new Process class is the unit computation that needs to be performed on each\n", "spectra. ``map_function()`` needs to take as input a single spectra and return the amplitude at the peak (a single\n", "value). The ``compute()`` and ``unit_computation()`` will handle the parallelization.\n", "\n", "The scipy package has a very handy function called *find_peaks_cwt()* that facilitates the search for one or more\n", "peaks in a spectrum. We will be using a simplified function called *find_all_peaks()*.\n", "The exact methodology for finding the peaks is not of interest for this\n", "particular example. However, this function finds the index of 0 or more peaks in the spectra. We only expect\n", "one peak at the center of the spectra. Therefore, we can use the ``find_all_peaks()`` function to find the peaks and\n", "address those situations when too few or too many (> 1) peaks are found in a single spectra. Finally, we need to use\n", "the index of the peak to find the amplitude from the spectra.\n", "\n", "

Note

\n", "\n", "``_map_function()`` must be marked as a\n", "[static method](https://www.geeksforgeeks.org/class-method-vs-static-method-python/) instead of the default\n", "``class method``. This means that ``_map_function()`` should function exactly the same if it were outside the\n", "class we are defining. In other words, it should not make any references to properties or functions of the class\n", "such as ``self.my_important_variable`` or ``self.some_function()``.\n", "\n", "### test()\n", "A useful test function should be able to find the peak amplitude for any single spectra in the dataset. So, given the\n", "index of a pixel (provided by the user), we should perform two operations:\n", "\n", "* read the spectra corresponding to that index from the HDF5 dataset\n", "* apply the ``map_function()`` to this spectra and return the result.\n", "\n", "The goal here is to load the smallest necessary portion of data from the HDF5 dataset to memory and test it against\n", "the ``map_function()``\n", "\n", "### create_results_datasets()\n", "Every Process involves a few tasks for this function:\n", "\n", "* the creation of a HDF5 group to hold the datasets containing the results - pyUSID.hdf_utils has a handy function\n", " that takes care of this.\n", "* storing any relevant metadata regarding this processing as attributes of the HDF5 group for provenance, traceability\n", " , and reproducibility.\n", "\n", " * ``last_pixel`` is a reserved attribute that serves as a flag indicating the last pixel that was successfully\n", " processed and written to the results dataset. This attribute is key for resuming partial computations.\n", "* the creation of HDF5 dataset(s) to hold the results. ``map_function()`` takes a spectra (1D array) and returns the\n", " amplitude (a single value). Thus the input dataset (position, spectra) will be reduced to (position, 1). So, we only\n", " need to create a single empty dataset to hold the results.\n", "\n", "We just need to ensure that we have a reference to the results dataset so that we can populate it with the results.\n", "\n", "### write_results_chunk()\n", "The result of ``compute()`` will be a list of amplitude values. All we need to do is:\n", "\n", "* call the ``self._get_pixels_in_current_batch()`` to find out which pixels were processed in this batch\n", "* write the results into the HDF5 dataset\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class PeakFinder(usid.Process):\n", "\n", " def __init__(self, h5_main, **kwargs):\n", " \"\"\"\n", " Applies Bayesian Inference to General Mode IV (G-IV) data to extract the true current\n", "\n", " Parameters\n", " ----------\n", " h5_main : h5py.Dataset object\n", " Dataset to process\n", " kwargs : dict\n", " Other parameters specific to the Process class and nuanced bayesian_inference parameters\n", " \"\"\"\n", " super(PeakFinder, self).__init__(h5_main, 'Peak_Finding',\n", " parms_dict={'algorithm': 'find_all_peaks'},\n", " **kwargs)\n", "\n", " def test(self, pixel_ind):\n", " \"\"\"\n", " Test the algorithm on a single pixel\n", "\n", " Parameters\n", " ----------\n", " pixel_ind : uint\n", " Index of the pixel in the dataset that the process needs to be tested on.\n", " \"\"\"\n", " # First read the HDF5 dataset to get the spectra for this pixel\n", " spectra = self.h5_main[pixel_ind]\n", " # Next, apply the map function to the spectra. done!\n", " return self._map_function(spectra)\n", "\n", " def _create_results_datasets(self):\n", " \"\"\"\n", " Creates the datasets an Groups necessary to store the results.\n", " There are only THREE operations happening in this function:\n", " 1. Creation of HDF5 group to hold results\n", " 2. Writing relevant metadata to this HDF5 group\n", " 3. Creation of a HDF5 dataset to hold results\n", "\n", " Please see examples on utilities for writing h5USID files for more information\n", " \"\"\"\n", " # 1. create a HDF5 group to hold the results\n", " self.h5_results_grp = usid.hdf_utils.create_results_group(self.h5_main, self.process_name)\n", "\n", " # 2. Write relevant metadata to the group\n", " sidpy.hdf_utils.write_simple_attrs(self.h5_results_grp, self.parms_dict)\n", "\n", " # Explicitly stating all the inputs to write_main_dataset\n", " # The process will reduce the spectra at each position to a single value\n", " # Therefore, the result is a 2D dataset with the same number of positions as self.h5_main\n", " results_shape = (self.h5_main.shape[0], 1)\n", " results_dset_name = 'Peak_Response'\n", " results_quantity = 'Amplitude'\n", " results_units = 'V'\n", " pos_dims = None # Reusing those linked to self.h5_main\n", " spec_dims = usid.Dimension('Empty', 'a. u.', 1)\n", "\n", " # 3. Create an empty results dataset that will hold all the results\n", " self.h5_results = usid.hdf_utils.write_main_dataset(self.h5_results_grp, results_shape, results_dset_name,\n", " results_quantity, results_units, pos_dims, spec_dims,\n", " dtype=np.float32,\n", " h5_pos_inds=self.h5_main.h5_pos_inds,\n", " h5_pos_vals=self.h5_main.h5_pos_vals)\n", " # Note that this function automatically creates the ancillary datasets and links them.\n", "\n", " print('Finished creating datasets')\n", "\n", " def _write_results_chunk(self):\n", " \"\"\"\n", " Write the computed results back to the H5\n", " In this case, there isn't any more additional post-processing required\n", " \"\"\"\n", " # Find out the positions to write to:\n", " pos_in_batch = self._get_pixels_in_current_batch()\n", "\n", " # write the results to the file\n", " self.h5_results[pos_in_batch, 0] = np.array(self._results)\n", "\n", " @staticmethod\n", " def _map_function(spectra, *args, **kwargs):\n", " \"\"\"\n", " This is the function that will be applied to each pixel in the dataset.\n", " It's job is to demonstrate what needs to be done for each pixel in the dataset.\n", " pyUSID.Process will handle the parallel computation and memory management\n", "\n", " As in typical scientific problems, the results from find_all_peaks() need to be\n", " post-processed\n", "\n", " In this case, the find_all_peaks() function can sometimes return 0 or more than one peak\n", " for spectra that are very noisy\n", "\n", " Knowing that the peak is typically at the center of the spectra,\n", " we return the central index when no peaks were found\n", " Or the index closest to the center when multiple peaks are found\n", "\n", " Finally once we have a single index, we need to index the spectra by that index\n", " in order to get the amplitude at that frequency.\n", " \"\"\"\n", "\n", " peak_inds = find_all_peaks(spectra, [20, 60], num_steps=30)\n", "\n", " central_ind = len(spectra) // 2\n", " if len(peak_inds) == 0:\n", " # too few peaks\n", " # set peak to center of spectra\n", " val = central_ind\n", " elif len(peak_inds) > 1:\n", " # too many peaks\n", " # set to peak closest to center of spectra\n", " dist = np.abs(peak_inds - central_ind)\n", " val = peak_inds[np.argmin(dist)]\n", " else:\n", " # normal situation\n", " val = peak_inds[0]\n", " # Finally take the amplitude of the spectra at this index\n", " return np.abs(spectra[val])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Comments\n", "* The class appears to be large mainly because of comments that explain what each line of code is doing.\n", "* Several functions of pyUSID.Process such as ``__init__()`` and ``compute()`` were inherited from the\n", " pyUSID.Process class.\n", "* In simple cases such as this, we don't even have to implement a function to read the data from the dataset since\n", " pyUSID.Process automatically calculates how much of the data iss safe to load into memory. In this case, the\n", " dataset is far smaller than the computer memory, so the entire dataset can be loaded and processed at once.\n", "* In this example, we did not need any pre-processing or post-processing of results but those can be implemented too\n", " if necessary.\n", "* The majority of the code in this class would have to be written regardless of whether the intention is formalize the\n", " data processing or not. In fact, we would argue that **more** code may need to be written than what is shown below\n", " if one were **not** formalizing the data processing (data reading, parallel computing, memory management, etc.)\n", "* This is the simplest possible implementation of Process. Certain features such as checking for existing results and\n", " resuming partial computations have not been shown in this example.\n", "\n", "### Use the class\n", "Now that the class has been written, it can be applied to an actual dataset.\n", "\n", "### Load the dataset\n", "In order to demonstrate this Process class, we will be using a real experimental dataset that is available on the\n", "pyUSID GitHub project. First, lets download this file from Github:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_path = 'temp.h5'\n", "url = 'https://raw.githubusercontent.com/pycroscopy/pyUSID/master/data/BELine_0004.h5'\n", "if os.path.exists(h5_path):\n", " os.remove(h5_path)\n", "_ = wget.download(url, h5_path, bar=None)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lets open the file in an editable (r+) mode and look at the contents:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_file = h5py.File(h5_path, mode='r+')\n", "print('File contents:\\n')\n", "sidpy.hdf_utils.print_tree(h5_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The focus of this example is not on the data storage or formatting but rather on demonstrating our new Process class\n", "so lets dive straight into the main dataset that requires analysis of the spectra:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_chan_grp = h5_file['Measurement_000/Channel_000']\n", "\n", "# Accessing the dataset of interest:\n", "h5_main = usid.USIDataset(h5_chan_grp['Raw_Data'])\n", "print('\\nThe main dataset:\\n------------------------------------')\n", "print(h5_main)\n", "\n", "# Extract some metadata:\n", "num_rows, num_cols = h5_main.pos_dim_sizes\n", "freq_vec = h5_main.get_spec_values('Frequency') * 1E-3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use the Process class\n", "\n", "### Instantiation\n", "Note that the instantiation of the new ``PeakFinder`` Process class only requires that we supply the main dataset on\n", "which the computation will be performed:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fitter = PeakFinder(h5_main)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### test()\n", "As advised, lets test the ``PeakFinder`` on an example pixel:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "row_ind, col_ind = 103, 19\n", "pixel_ind = col_ind + row_ind * num_cols\n", "\n", "# Testing is as simple as supplying a pixel index\n", "amplitude = fitter.test(pixel_ind)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's visualize the results of the test:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spectra = h5_main[pixel_ind]\n", "\n", "fig, axis = plt.subplots(figsize=(4, 4))\n", "axis.scatter(freq_vec, np.abs(spectra), c='black')\n", "axis.axhline(amplitude, color='r', linewidth=2)\n", "axis.set_xlabel('Frequency (kHz)', fontsize=14)\n", "axis.set_ylabel('Amplitude (V)')\n", "axis.set_ylim([0, 1.1 * np.max(np.abs(spectra))])\n", "axis.set_title('PeakFinder applied to pixel\\nat row: {}, col: {}'.format(row_ind, col_ind), fontsize=16);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we weren't happy with the results, we could tweak some parameters when initializing the ``PeakFinder`` object and try\n", "again. However, for the sake of simplicity, we don't have any parameters we can / want to adjust in this case. So,\n", "lets proceed.\n", "\n", "### compute()\n", "Now that we know that the ``PeakFinder`` appears to be performing as expected, we can apply the amplitude finding\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_results_grp = fitter.compute()\n", "print(h5_results_grp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lets take a look again at the file contents. We should be seeing a new HDF5 group called ``Raw_Data-Peak_Finding_000`` and\n", "three datasets within the group. Among the datasets is ``Peak_Response`` that contains the peak amplitudes we are\n", "interested in.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sidpy.hdf_utils.print_tree(h5_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lets look at this ``Peak_Response`` dataset:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_peak_amps = usid.USIDataset(h5_results_grp['Peak_Response'])\n", "print(h5_peak_amps)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize\n", "Since ``Peak_Response`` is a USIDataset, we could use its built-in ``visualize()`` function:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_peak_amps.visualize()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up\n", "Finally lets close and delete the example HDF5 file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_file.close()\n", "os.remove(h5_path)\n", "os.remove(peak_func_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Flow of functions\n", "\n", "By default, very few functions (``test()``, ``compute()``) are exposed to users. This means that one of these\n", "functions calls a chain of the other functions in the class.\n", "\n", "### init()\n", "Instantiating the class via something like: ``fitter = PeakFinder(h5_main)`` happens in two parts:\n", "\n", "1. First the subclass (``PeakFinder``) calls the initialization function in ``Process`` to let it run some checks:\n", "\n", " * Check if the provided ``h5_main`` is indeed a ``Main`` dataset\n", " * call ``set_memory_and_cores()`` to figure out how many pixels can be read into memory at any given time\n", " * Initialize some basic variables\n", "\n", "2. Next, the subclass continues any further validation / checks / initialization - this was not implemented for\n", "``PeakFinder`` but here are some things that can be done:\n", "\n", " * Find HDF5 groups which either have partial or fully computed results already for the same parameters by calling\n", " ``check_for_duplicates()``\n", "\n", "### test()\n", "This function only calls the ``map_function()`` by definition\n", "\n", "### compute()\n", " Here is how ``compute()`` works:\n", "\n", " * Check if you can return existing results for the requested computation and return if available by calling either:\n", "\n", " * ``get_existing_datasets()`` - reads all necessary parameters and gets references to the HDF5 datasets that should\n", " contain the results\n", " * ``use_partial_computation()`` - pick the first partially computed results group that was discovered by\n", " ``check_for_duplicates()``\n", " * call ``create_results_datasets()`` to create the HDF5 datasets and group objects\n", " * read the first chunk of data via ``read_data_chunk()`` into ``self.data``\n", " * Until the source dataset is fully read (``self.data is not None``), do:\n", "\n", " * call ``unit_computation()`` on ``self.data``\n", "\n", " * By default ``unit_computation()`` just maps ``map_function()`` onto ``self.data``\n", "\t * If you need to pass specific arguments, you may need to implement it directly. See \"Advanced Examples\"\n", " * call ``write_results_chunk()`` to write ``self._results`` into the HDF5 datasets\n", " * read the next chunk of data into ``self.data``\n", "\n", " use_partial_computation()\n", " --------------------------\n", " Not used in ``PeakFinder`` but this function can be called to manually specify an HDF5 group containing partial\n", " results\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tips and tricks\n", "Here we will cover a few common use-cases that will hopefully guide you in structuring your computational problem\n", "\n", "### Advanced examples\n", "Please see the following pycroscopy classes to learn more about the advanced functionalities such as resuming\n", "computations, checking of existing results, using unit_computation(), etc.:\n", "\n", "* [SignalFilter](https://github.com/pycroscopy/pycroscopy/blob/master/pycroscopy/processing/signal_filter.py)\n", "* [GIVBayesian](https://github.com/pycroscopy/BGlib/blob/master/BGlib/gmode/analysis/giv_bayesian.py)\n", "* [FFTA](https://github.com/rajgiriUW/ffta/blob/master/ffta/hdf_utils/process.py)\n", "\n", "These classes work on personal computers as well as a cluster of computers (e.g. - a high-performance computing\n", "cluster).\n", "\n", "### Integrating into your personal workflow\n", "As an example of how to integrate with an outside codebase, the package [FFTA](https://github.com/rajgiriUW/ffta/blob/master/ffta/hdf_utils/process.py)\n", "implements its own Process class for parallel computation. There you can see how to pass arguments to ``unit_computation()``\n", "\n", "\n", "### Juggling dimensions\n", "We intentionally chose a simple example above to quickly illustrate the main components / philosophy of the Process\n", "class. The above example had two position dimensions collapsed into the first axis of the dataset and a single\n", "spectroscopic dimension (``Frequency``). What if the spectra were acquired as a function of other variables such as a\n", "``DC bias``? In other words, the dataset would now have N spectra per location. In such cases, the dataset would have\n", "2 spectroscopic dimensions: ``Frequency`` and ``DC bias``. We cannot therefore simply map the ``map_function()`` to\n", "the data in every pixel. This is because the ``map_function()`` expects to work over a single spectra whereas we now\n", "have N spectra per pixel. Contrary to what one would assume, we do not need to throw away all the code we wrote above.\n", "We only need to add code to juggle / move the dimensions around till the problem looks similar to what we had above.\n", "\n", "In other words, the above problem was written for a dataset of shape ``(P, S)`` where ``P`` is the number of positions\n", "and ``S`` is the length of a single spectra. Now, we have data of shape ``(P, N*S)`` where ``N`` is the number of\n", "spectra per position. In order to use most of the code already written above, we need to reshape the data to the shape\n", "``(P*N, S)``. Now, we can easily map the existing ``map_function()`` on this ``(P*N, S)`` dataset.\n", "\n", "As far as implementation is concerned, we would need to add the reshaping step to ``_read_data_chunk()`` as:\n", "\n", "```python\n", "\n", "def _read_data_chunk(self):\n", " super(PeakFinder, self)._read_data_chunk()\n", " # The above line causes the base Process class to read X pixels from the dataset into self.data\n", " # All we need to do now is reshape self.data from (X, N*S) to (X*N, S):\n", " # Assuming that we know N (num_spectra) through some metadata:\n", " self.data = self.data.reshape(self.data.shape[0]* num_spectra, -1)\n", "```\n", "\n", "Recall that ``_read_data_chunk()`` reads ``X`` pixels at a time where ``X`` is the largest number of pixels whose raw\n", "data, intermediate products, and results can simultaneously be held in memory. The dataset used for the example above\n", "is tractable enough that the entire data is loaded at once, meaning that ``X = P`` in this case.\n", "\n", "From here, on, the computation would continue as is but as expected, the results would also consequently be of shape\n", "``(P*N)``. We would have to reverse the reshape operation to get back the results in the form: ``(P, N)``. So we\n", "would prepend the reverse reshape operation to ``_write_results_chunk()``:\n", "\n", "```python\n", "\n", "def _write_results_chunk(self):\n", " # Recall that the results from the computation are stored in a list called self._results\n", " self._results = np.array(self._results) # convert from list to numpy array\n", " self._results = self._results.reshape(-1, num_spectra)\n", " # Now self._results is of shape (P, N) and we can store it in the HDF5 dataset as we did above.\n", "\n", "```\n", "\n", "### Computing on chunks instead of mapping\n", "In certain cases, the computation is a little more complex that the ``map_function()`` cannot directly be mapped to\n", "the data. Alternatively, in some cases the ``map_function()`` needs to mapped multiple times or different sections of\n", "the ``self.data``. For such cases, the ``_unit_computation()`` in ``Process`` provides far more flexibility to the\n", "developer. Please see the ``pycroscopy.processing.SignalFilter`` and ``BGlib.gmode.analysis.GIVBayesian`` for examples.\n", "\n", "By default, ``_unit_computation()`` maps the ``map_function()`` to ``self.data`` using ``parallel_compute()`` and\n", "stores the results in ``self._results``. Recall that ``self.data`` contains data for ``X`` pixels.\n", "For example, ``_unit_computation()`` in ``BGlib.gmode.analysis.GIVBayesian`` breaks up the spectra (second axis) of\n", "``self.data`` into two halves and computes the results separately for each half. ``_unit_computation()`` for this\n", "class calls ``parallel_compute()`` twice - to map the ``map_function()`` to each half of the data chunk. This is a\n", "functionality that is challenging to efficiently attain without ``_unit_computation()``. Note that when the\n", "``_unit_computation()`` is overridden, the developer is responsible for the correct usage of ``parallel_compute()``,\n", "especially passing arguments and keyword arguments.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python [default]", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.5" } }, "nbformat": 4, "nbformat_minor": 1 }