{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n# Speed up computations with parallel_compute()\n\n**Suhas Somnath, Chris R. Smith**\n\n9/8/2017\n\n**This document will demonstrate how ``sidpy.proc.comp_utils.parallel_compute()`` can significantly speed up data processing by\nusing all available CPU cores in a computer**\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\nQuite often, we need to perform the same operation on every single component in our data. One of the most popular\nexamples is functional fitting applied to spectra collected at each location on a grid. While, the operation itself\nmay not take very long, computing this operation thousands of times, once per location, using a single CPU core can\ntake a long time to complete. Most personal computers today come with at least two cores, and in many cases, each of\nthese cores is represented via two logical cores, thereby summing to a total of at least four cores. Thus, it is\nprudent to make use of these unused cores whenever possible. Fortunately, there are a few python packages that\nfacilitate the efficient use of all CPU cores with minimal modifications to the existing code.\n\n``sidpy.proc.comp_utils.parallel_compute()`` is a very handy function that simplifies parallel computation significantly to a\n**single function call** and will be discussed in this document.\n\n## Example scientific problem\nFor this example, we will be working with a ``Band Excitation Piezoresponse Force Microscopy (BE-PFM)`` imaging dataset\nacquired from advanced atomic force microscopes. In this dataset, a spectra was collected for each position in a two\ndimensional grid of spatial locations. Thus, this is a three dimensional dataset that has been flattened to a two\ndimensional matrix in accordance with **Universal Spectroscopy and Imaging Data (USID)** model.\n\nEach spectra in this dataset is expected to have a single peak. The goal is to find the positions of the peaks in each\nspectra. Clearly, the operation of finding the peak in one spectra is independent of the same operation on another\nspectra. Thus, we could in theory divide the dataset in to N parts and use N CPU cores to compute the results much\nfaster than it would take a single core to compute the results. There is an important caveat to this statement and it\nwill be discussed at the end of this document.\n\n**Here, we will learn how to fit the thousands of spectra using all available cores on a computer.**\nNote, that this is applicable only for a single CPU. Please refer to another advanced example for multi-CPU computing.\n\n

Note

In order to run this document on your own computer, you will need to:\n\n 1. Download the document as a Jupyter notebook using the link at the bottom of this page.\n 2. Save the contents of `this python file `_ as ``peak_finding.py`` in the\n same folder as the notebook from step 1.

\n\nEnsure python 3 compatibility:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from __future__ import division, print_function, absolute_import, unicode_literals\n\n# The package for accessing files in directories, etc.:\nimport os\n\n# Warning package in case something goes wrong\nfrom warnings import warn\nimport subprocess\n\n\ndef install(package):\n subprocess.call([sys.executable, \"-m\", \"pip\", \"install\", package])\n# Package for downloading online files:\ntry:\n # This package is not part of anaconda and may need to be installed.\n import wget\nexcept ImportError:\n warn('wget not found. Will install with pip.')\n import pip\n install(wget)\n import wget\n\n# The mathematical computation package:\nimport numpy as np\n\n# The package used for creating and manipulating HDF5 files:\nimport h5py\n\n# Packages for plotting:\nimport matplotlib.pyplot as plt\n\n# Parallel computation library:\ntry:\n import joblib\nexcept ImportError:\n warn('joblib not found. Will install with pip.')\n import pip\n install('joblib')\n import joblib\n\n# Timing\nimport time\n\n# A handy python utility that allows us to preconfigure parts of a function\nfrom functools import partial\n\n# Finally import sidpy:\ntry:\n from sidpy.proc.comp_utils import parallel_compute\nexcept ImportError:\n warn('sidpy not found. Will install with pip.')\n import pip\n install('sidpy')\n from sidpy.proc.comp_utils import parallel_compute\n\n# import the scientific function:\nimport sys\nsys.path.append('./supporting_docs/')\nfrom peak_finding import find_all_peaks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the dataset\nIn order to demonstrate parallel computing, we will be using a real experimental dataset that is available on the\npyUSID GitHub project. First, lets download this file from Github:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "h5_path = 'temp.h5'\nurl = 'https://raw.githubusercontent.com/pycroscopy/pyUSID/master/data/BELine_0004.h5'\nif os.path.exists(h5_path):\n os.remove(h5_path)\n_ = wget.download(url, h5_path, bar=None)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, lets open this HDF5 file. The focus of this example is not on the data storage or arrangement but rather on\ndemonstrating parallel computation so lets dive straight into the main dataset that requires fitting of the spectra:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Open the file in read-only mode\nh5_file = h5py.File(h5_path, mode='r')\n# Get handle to the the raw data\nh5_meas_grp = h5_file['Measurement_000']\n\n# Accessing the dataset of interest:\nh5_main = h5_meas_grp['Channel_000/Raw_Data']\nprint('\\nThe main dataset:\\n------------------------------------')\nprint(h5_main)\n\nnum_cols = 128\ncores_vec = list()\ntimes_vec = list()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The operation\nThe scipy package has a very handy function called *find_peaks_cwt()* that facilitates the search for one or more\npeaks in a spectrum. We will be using a function called *find_all_peaks()* that uses *find_peaks_cwt()*.\nFor the purposes of this example, we do not be concerned with how this\nfunction works. All we need to know is that this function takes 3 inputs:\n\n* ``vector`` - a 1D array containing the spectra at a single location\n* ``width_bounds`` - something like [20, 50] that instructs the function to look for peaks that are 20-50\n data-points wide. The function will look for a peak with width of 20, then again for a peak of width - 21 and so on.\n* ``num_steps`` - The number of steps within the possible widths [20, 50], that the search must be performed\n\nThe function has one output:\n\n* ``peak_indices`` - an array of the positions at which peaks were found.\n\n.. code-block:: python\n\n def find_all_peaks(vector, width_bounds, num_steps=20, **kwargs):\n \"\"\"\n This is the function that will be mapped by multiprocess. This is a wrapper around the scipy function.\n It uses a parameter - wavelet_widths that is configured outside this function.\n\n Parameters\n ----------\n vector : 1D numpy array\n Feature vector containing peaks\n width_bounds : tuple / list / iterable\n Min and max for the size of the window\n num_steps : uint, (optional). Default = 20\n Number of different peak widths to search\n\n Returns\n -------\n peak_indices : list\n List of indices of peaks within the prescribed peak widths\n \"\"\"\n # The below numpy array is used to configure the returned function wpeaks\n wavelet_widths = np.linspace(width_bounds[0], width_bounds[1], num_steps)\n\n peak_indices = find_peaks_cwt(np.abs(vector), wavelet_widths, **kwargs)\n\n return peak_indices\n\n## Testing the function\nLet\u2019s see what the operation on an example spectra returns.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "row_ind, col_ind = 103, 19\npixel_ind = col_ind + row_ind * num_cols\nspectra = h5_main[pixel_ind]\n\npeak_inds = find_all_peaks(spectra, [20, 60], num_steps=30)\n\nfig, axis = plt.subplots()\naxis.scatter(np.arange(len(spectra)), np.abs(spectra), c='black')\naxis.axvline(peak_inds[0], color='r', linewidth=2)\naxis.set_ylim([0, 1.1 * np.max(np.abs(spectra))]);\naxis.set_title('find_all_peaks found peaks at index: {}'.format(peak_inds), fontsize=16)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before we apply the function to the entire dataset, lets load the dataset to memory so that file-loading time is not a\nfactor when comparing the times for serial and parallel computing times:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "raw_data = h5_main[()]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Note

This documentation is being generated automatically by a computer in the cloud whose workload cannot be controlled\n or predicted. Therefore, the computational times reported in this document may not be consistent and can even be\n contradictory. For best results, we recommend that download and run this document as a jupyter notebook.

\n\n## Serial computing\nA single call to the function does not take substantial time. However, performing the same operation on each of the\n``16,384`` pixels sequentially can take substantial time. The simplest way to find all peak positions is to simply loop\nover each position in the dataset:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "serial_results = list()\n\nt_0 = time.time()\nfor vector in raw_data:\n serial_results.append(find_all_peaks(vector, [20, 60], num_steps=30))\ntimes_vec.append(time.time()-t_0)\ncores_vec.append(1)\nprint('Serial computation took', np.round(times_vec[-1], 2), ' seconds')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## sidpy.proc.comp_utils.parallel_compute()\n\nThere are several libraries that can utilize multiple CPU cores to perform the same computation in parallel. Popular\nexamples are ``Multiprocessing``, ``Mutiprocess``, ``Dask``, ``Joblib`` etc. Each of these has their own\nstrengths and weaknesses. Some of them have painful caveats such as the inability to perform the parallel computation\nwithin a jupyter notebook. In order to lower the barrier to parallel computation, we have developed a very handy\nfunction called ``sidpy.proc.comp_utils.parallel_compute()`` that simplifies the process to a single function call.\n\nIt is a lot **more straightforward** to provide the arguments and keyword arguments of the function that needs to be\napplied to the entire dataset. Furthermore, this function intelligently assigns the number of CPU cores for the\nparallel computation based on the size of the dataset and the computational complexity of the unit computation.\nFor instance, it scales down the number of cores for small datasets if each computation is short. It also ensures that\n1-2 cores fewer than all available cores are used by default so that the user can continue using their computer for\nother purposes while the computation runs.\n\nLets apply this ``parallel_compute`` to this problem:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "cpu_cores = 2\nargs = [[20, 60]]\nkwargs = {'num_steps': 30}\n\nt_0 = time.time()\n\n# Execute the parallel computation\nparallel_results = parallel_compute(raw_data, find_all_peaks,\n cores=cpu_cores, func_args=args,\n func_kwargs=kwargs,\n joblib_backend='multiprocessing')\n\ncores_vec.append(cpu_cores)\ntimes_vec.append(time.time()-t_0)\nprint('Parallel computation with {} cores took {} seconds'.format(cpu_cores, np.round(times_vec[-1], 2)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compare the results\nBy comparing the run-times for the two approaches, we see that the parallel computation is substantially faster than\nthe serial computation. Note that the numbers will differ between computers. Also, the computation was performed on\na relatively small dataset for illustrative purposes. The benefits of using such parallel computation will be far\nmore apparent for much larger datasets.\n\nLet's compare the results from both the serial and parallel methods to ensure they give the same results:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print('Result from serial computation: {}'.format(serial_results[pixel_ind]))\nprint('Result from parallel computation: {}'.format(parallel_results[pixel_ind]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simplifying the function\nNote that the ``width_bounds`` and ``num_steps`` arguments will not be changed from one pixel to another. It would be\ngreat if we didn't have to keep track of these constant arguments. We can use a very handy python tool called\n``partial()`` to do just this. Below, all we are doing is creating a new function that always passes our preferred\nvalues for ``width_bounds`` and ``num_steps`` arguments to find_all_peaks. While it may seem like this is unimportant,\nit is very convenient when setting up the parallel computing:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "find_peaks = partial(find_all_peaks, num_steps=30, width_bounds=[20, 60])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that even though ``width_bounds`` is an argument, it needs to be specified as though it were a keyword argument\nlike ``num_steps``.\nLet's try calling our simplified function, ``find_peaks()`` to make sure that it results in the same peak index for the\naforementioned chosen spectra:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print('find_peaks found peaks at index: {}'.format(find_peaks(h5_main[pixel_ind])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## More cores!\nLets use ``find_peaks()`` instead of ``find_all_peaks`` on the entire dataset but increase the number of cores to 3.\nNote that we do not need to specify ``func_kwargs`` anymore. Also note that this is a very simple function and the\nbenefits of ``partial()`` will be greater for more complex problems.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "cpu_cores = 3\n\nt_0 = time.time()\n\n# Execute the parallel computation\nparallel_results = parallel_compute(raw_data, find_peaks,\n cores=cpu_cores,\n joblib_backend='multiprocessing')\n\ncores_vec.append(cpu_cores)\ntimes_vec.append(time.time()-t_0)\nprint('Parallel computation with {} cores took {} seconds'.format(cpu_cores, np.round(times_vec[-1], 2)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Scalability\nNow lets see how the computational time relates to the number of cores.\nDepending on your computer (and what was running on your computer along with this computation), you are likely to see\ndiminishing benefits of additional cores beyond 2 cores for this specific problem in the plot below. This is because\nthe dataset is relatively small and each peak-finding operation is relatively quick. The overhead of adding additional\ncores quickly outweighs the speedup in distributing the work among multiple CPU cores.\n\n

Note

This documentation is being generated automatically by a computer in the cloud whose workload cannot be controlled\n or predicted. Therefore, the computational times reported in this document may not be consistent and can even be\n contradictory. For best results, we recommend that download and run this document as a jupyter notebook.\n\n If everything ran correctly, you should see the computational time decrease substantially from 1 to 2 cores but\n the decrease from 2 to 3 or 3 to 4 cores should be minimal or negligible.

\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "fig, axis = plt.subplots(figsize=(3.5, 3.5))\naxis.scatter(cores_vec, times_vec)\naxis.set_xlabel('CPU cores', fontsize=14)\naxis.set_ylabel('Compute time (sec)', fontsize=14)\nfig.tight_layout()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Best practices for parallel computing\n --------------------------------------\n\n While it may seem tempting to do everything in parallel, it is important to be aware of some of the trade-offs and\n best-practices for parallel computing (multiple CPU cores) when compared to traditional serial computing (single\n CPU core):\n\n * There is noticeable time overhead involved with setting up each compute worker (CPU core in this case).\n For very simple or small computations, this overhead may outweigh the speed-up gained with using multiple cores.\n * Parallelizing computations that read and write to files at each iteration may be actually be noticeably *slower*\n than serial computation since the cores will compete for rights to read and write to the file(s)\n and these input/output operations are by far the slowest components of the workflow. Instead, it makes sense to\n read large amounts of data from the necessary files once, perform the computation, and then write to the files once\n after all the computation is complete. In fact, this is what we automatically do in the ``Process`` class in pyUSID or pyNSID.\n Please see `another example <./plot_process.html>`_ on how to write a Process class to formalize data processing.\n\n .. note::\n ``parallel_compute()`` will revert to serial processing when called within the message passing interface (MPI)\n context in a high-performance computing (HPC) cluster. Due to conflicts between MPI, numpy, and joblib, it is\n recommended to use a pure MPI approach for computing instead of the MPI + OpenMP (joblib) paradigm.\n\n#######################################################################################################################\n Cleaning up\n ~~~~~~~~~~~\n Lets not forget to close and delete the temporarily downloaded file:\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "h5_file.close()\nos.remove(h5_path)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.1" } }, "nbformat": 4, "nbformat_minor": 0 }