LiberTEM/LiberTEM

View on GitHub
docs/source/dask.rst

Summary

Maintainability
Test Coverage

.. note::
    The features described below are experimental and under development.

.. testsetup:: *

    import numpy as np
    
    from libertem.api import Context
    from libertem.executor.inline import InlineJobExecutor

    ctx = Context(executor=InlineJobExecutor())
    dataset = ctx.load("memory", datashape=(16, 16, 16, 16), sig_dims=2)

.. _`dask`:

Dask integration
================

.. versionadded:: 0.9.0

Since version 0.9, LiberTEM supports seamless integration with workflows that
are based on Dask arrays. That means Dask arrays can serve as input for LiberTEM, and
UDF computations can produce Dask arrays. Additionally,
:meth:`~libertem.contrib.daskadapter.make_dask_array` can create Dask arrays from LiberTEM datasets.

Linking with HyperSpy Lazy Signals
----------------------------------

LiberTEM's Dask integration features can be used to combine features from
HyperSpy and LiberTEM in a single analysis workflow:

.. toctree::
   :maxdepth: 2

   hyperspy-integration

.. _`scheduler`:

Scheduler
---------

LiberTEM uses Dask.distributed as the default method to execute tasks.
Unless instructed otherwise, LiberTEM keeps the default Dask scheduler as-is and only
uses the Dask :code:`Client` internally to make sure existing workflows keep running
as before. However, for a closer integration it can be beneficial to use the same scheduler
for both LiberTEM and other Dask computations. There are several options for that:

* Set the LiberTEM Dask cluster as default Dask scheduler:

    * Use :code:`Context.make_with('dask-make-default')` (:meth:`~libertem.api.Context.make_with`)
    * Pass :code:`client_kwargs={'set_as_default': True}` 
      to :meth:`~libertem.executor.dask.DaskJobExecutor.connect` 
      or :meth:`~libertem.executor.dask.DaskJobExecutor.make_local`

* Use an existing Dask scheduler:

    * Use :code:`Context.make_with('dask-integration')` to start 
      an executor that is compatible with the current Dask scheduler.

* Use dask.delayed:

    * :class:`libertem.executor.delayed.DelayedJobExecutor` can 
      return UDF computations as Dask arrays. The scheduler will only be  
      determined when :code:`compute()` is called downstream using the 
      normal mechanisms of Dask.

.. _daskarray:

Load datasets as Dask arrays
----------------------------

The :meth:`~libertem.contrib.daskadapter.make_dask_array` function can generate
a `distributed Dask array <https://docs.dask.org/en/latest/array.html>`_ from a
:class:`~libertem.io.dataset.base.DataSet` using its partitions as blocks. The
typical LiberTEM partition size is close to the optimum size for Dask array
blocks under most circumstances. The dask array is accompanied with a map of
optimal workers. This map should be passed to the :meth:`compute` method in
order to construct the blocks on the workers that have them in local storage.

.. testcode:: make_dask_array

    from libertem.contrib.daskadapter import make_dask_array

    # Construct a Dask array from the dataset
    # The second return value contains information
    # on workers that hold parts of a dataset in local
    # storage to ensure optimal data locality
    dask_array, workers = make_dask_array(dataset)

    result = dask_array.sum(axis=(-1, -2)).compute(workers=workers)

Load Dask arrays as datasets
----------------------------

LiberTEM datasets can be created from Dask arrays by using
:meth:`libertem.api.Context.load` with filetype :code:`'dask'`. See
:ref:`daskds` for details. The performance can vary and depends on chunking,
executor, I/O method and Dask array creation method. Please `contact us
<https://gitter.im/LiberTEM/Lobby>`_ or `create an Issue
<https://github.com/LiberTEM/LiberTEM/issues/new>`_ for questions, bug reports
and other feedback!

This basic example shows running a UDF on a Dask array:

.. testcode:: from_dask

    import dask.array as da
    from libertem.udf.sum import SumUDF

    # Create a Context that integrates well with
    # the current Dask scheduler
    ctx = Context.make_with('dask-integration')

    a = da.random.random((23, 42, 17, 19))
    
    ds = ctx.load('dask', a, sig_dims=2)
    res = ctx.run_udf(dataset=ds, udf=SumUDF())

    assert np.allclose(
        a.sum(axis=(0, 1)).compute(),
        res['intensity'].raw_data
    )

See also :ref:`executors` on how to set up compatible schedulers for
both Dask and LiberTEM.

.. _`delayed_udfs`:

Create Dask arrays with UDFs
----------------------------

Using a :class:`~libertem.executor.delayed.DelayedJobExecutor` with a
:class:`~libertem.api.Context` lets :class:`~libertem.api.Context.run_udf`
return results as Dask arrays.
In addition to the usual :attr:`~libertem.common.buffers.BufferWrapper.data`
and :attr:`~libertem.common.buffers.BufferWrapper.raw_data` properties, which
provide results as numpy arrays eagerly, the results are made available
as dask arrays using the attributes
:attr:`~libertem.executor.utils.dask_buffer.DaskResultBufferWrapper.delayed_data`
and :attr:`~libertem.executor.utils.dask_buffer.DaskResultBufferWrapper.delayed_raw_data`.
The computation is only performed when the
:code:`compute()` method is called on a Dask array result.

.. testcode:: to_dask

    from libertem.api import Context
    from libertem.executor.delayed import DelayedJobExecutor
    
    from libertem.udf.sumsigudf import SumSigUDF

    ctx = Context(executor=DelayedJobExecutor())
    res = ctx.run_udf(dataset=dataset, udf=SumSigUDF())

    print(res['intensity'].delayed_data)

.. testoutput:: to_dask
   
   dask.array<reshape, shape=(16, 16), dtype=float32, chunksize=(..., ...), chunktype=numpy.ndarray>

This method allows to create large intermediate results of :code:`kind='nav'`
efficiently in the form of Dask arrays. The partial results can stay as
ephemeral array chunks on the workers only while they are needed for downstream
calculations instead of transporting them through the cluster and instantiating
the full result on the main node.

For calls to :class:`~libertem.api.Context.run_udf` which run multiple UDFs
or have with UDFs which return multiple results, it is strongly recommended
to compute results with a single call to :code:`compute()`, in order to re-use
computation over the dataset. By default, Dask does not cache intermediate
results from prior runs, so individual calls to :code:`compute()` require
a complete re-run of the UDFs that were passed to :code:`run_udf`.

.. _`merge_all`:

.. _dask merge_all:

Merge function for Dask array results
-------------------------------------

.. note::
    See :ref:`udf merge_all` for more information.

LiberTEM already uses an efficient default merging method to create Dask arrays
for UDFs that only use :code:`kind='nav'` buffers and don't specify their own
:code:`merge()`.

For all UDFs that define their own :code:`merge()`,
:class:`~libertem.executor.delayed.DelayedJobExecutor` will use this
existing :code:`udf.merge()` function to assemble the final results, in the same
way it is used to assemble partial results in each partition. This is carried
out by wrapping the :code:`udf.merge()` in :code:`dask.delayed` calls.

However, the user can also specify a
:meth:`~libertem.udf.base.UDFMergeAllMixin.merge_all()` method on their UDF that
combines all partial results from the workers to the complete result in a single
step. This allows the :class:`~libertem.executor.delayed.DelayedJobExecutor` to
produce a streamlined task tree, which then gives Dask greater scope to
parallelise the merge step and reduce data transfers.

This example shows the task tree first with the built-in wrapper for :code:`udf.merge()`
and then the stramlined one after defining :code:`merge_all()`. In tests the streamlined
variant was twice as fast as the built-in wrapper.

.. testsetup:: merge_all

    import numpy as np
    from libertem.udf.base import UDF
    from libertem.executor.delayed import DelayedJobExecutor

.. testcode:: merge_all

    class MySumUDF(UDF):
        def get_result_buffers(self):
            return {
                'intensity': self.buffer(kind='sig', dtype=self.meta.input_dtype)
            }

        def process_tile(self, tile):
            self.results.intensity[:] += np.sum(tile, axis=0)

        def merge(self, dest, src):
            dest.intensity[:] += src.intensity
    
    ctx = Context(executor=DelayedJobExecutor())
    result = ctx.run_udf(udf=MySumUDF(), dataset=dataset)

    result['intensity'].delayed_data.visualize()

.. image:: ./images/tree-default-merge.png
    :width: 300px
    :alt: Task tree with wrapper for standard merge.

.. testcode:: merge_all

    class MySumMergeUDF(MySumUDF):
        # Define the merge_all() method for the UDF above
        def merge_all(self, ordered_results):
            # List and not generator for NumPy dispatch to work
            chunks = [b.intensity for b in ordered_results.values()]
            # NumPy will dispatch the stacking to the appropriate method
            # for the chunks.
            # See also https://numpy.org/doc/stable/user/basics.dispatch.html
            stack = np.stack(chunks)
            # Perform computation on the stacked chunks
            # equivalent to the normal merge()
            intensity = stack.sum(axis=0)
            
            # Return a dictionary mapping buffer name to new content
            return {'intensity': intensity}
    
    result2 = ctx.run_udf(udf=MySumMergeUDF(), dataset=dataset)

    result2['intensity'].delayed_data.visualize()

.. image:: ./images/tree-merge-all.png
    :width: 300px
    :alt: Task tree with :code:`merge_all()`.

.. testcleanup:: merge_all
    
    assert np.allclose(
        result['intensity'].raw_data,
        result2['intensity'].raw_data,
    )

The argument :code:`ordered_results` is a dictionary of partial results
results for that UDF. The dictionary is keyed by :class:`~libertem.common.slice.Slice`
objects, one for each partition processed, and is ordered in the flat navigation dimension.
Each partial result is itself a dictionary with a key for each result declared in
:code:`udf.get_result_buffers()`. The :code:`ordered_results` dictionary is created
such that the :code:`merge_all` method can safely concatenate the elements in the case
of :code:`'nav'`-shaped results. Any applied ROI is automatically taken into account
after the call to :code:`merge_all`. 

The return value from the function must be a dictionary of merged result arrays
with the keys matching the declared result buffers. There is, however, no requirement
to return merged results for all existing buffers, though any that are missing will not
contain results from the computation and are likely to be filled with zeros.


CUDA and scheduling
-------------------

A native LiberTEM Dask cluster uses resource tags to schedule work on CPUs or
CUDA devices based on an UDF's capability. For Dask integration a fallback was
implemented that allows running computations that can run on a CPU on a native
LiberTEM Dask cluster without requiring resource tags. However, CUDA-only computations
will require passing the appropriate tags as :code:`resources` argument to :code:`compute()`.

:meth:`libertem.executor.delayed.DelayedJobExecutor.get_resources_from_udfs` returns
the appropriate resources for a given set of UDFs based on their capabilities.

.. note::
    At this time the combination of CUDA-requiring UDFs and
    :class:`~libertem.executor.delayed.DelayedJobExecutor` is not well-tested.
    At a minimum, the merge process carried out on the main node
    will not take place with GPU-backed Dask arrays, though this is
    under consideration for the future.