Reusing Intermediaries with Dask

Dask provides a computational framework where arrays and the computations on them are built up into a ‘task graph’ before computation. Since opt_einsum is compatible with dask arrays this means that multiple contractions can be built into the same task graph, which then automatically reuses any shared arrays and contractions.

For example, imagine the two expressions:

>>> contraction1 = 'ab,dca,eb,cde'
>>> contraction2 = 'ab,cda,eb,cde'
>>> sizes = {l: 10 for l in 'abcde'}

The contraction 'ab,eb' is shared between them and could only be done once. First, let’s set up some numpy arrays:

>>> terms1, terms2 = contraction1.split(','), contraction2.split(',')
>>> terms = set((*terms1, *terms2))
>>> terms
{'ab', 'cda', 'cde', 'dca', 'eb'}

>>> import numpy as np
>>> np_arrays = {s: np.random.randn(*(sizes[c] for c in s)) for s in terms}
>>> # filter the arrays needed for each expression
>>> np_ops1 = [np_arrays[s] for s in terms1]
>>> np_ops2 = [np_arrays[s] for s in terms2]

Typically we would compute these expressions separately:

>>> oe.contract(contraction1, *np_ops1)
array(114.78314052)

>>> oe.contract(contraction2, *np_ops2)
array(-75.55902751)

However, if we use dask arrays we can combine the two operations, so let’s set those up:

>>> import dask.array as da
>>> da_arrays = {s: da.from_array(np_arrays[s], chunks=1000, name=s) for s in inputs}
>>> da_arrays
{'ab': dask.array<ab, shape=(10, 10), dtype=float64, chunksize=(10, 10)>,
 'cda': dask.array<cda, shape=(10, 10, 10), dtype=float64, chunksize=(10, 10, 10)>,
 'cde': dask.array<cde, shape=(10, 10, 10), dtype=float64, chunksize=(10, 10, 10)>,
 'dca': dask.array<dca, shape=(10, 10, 10), dtype=float64, chunksize=(10, 10, 10)>,
 'eb': dask.array<eb, shape=(10, 10), dtype=float64, chunksize=(10, 10)>}

>>> da_ops1  = [da_arrays[s] for s in terms1]
>>> da_ops2  = [da_arrays[s] for s in terms2]

Note chunks is a required argument relating to how the arrays are stored (see array-creation). Now we can perform the contraction:

>>> # these won't be immediately evaluated
>>> dy1 = oe.contract(contraction1, *da_ops1, backend='dask')
>>> dy2 = oe.contract(contraction2, *da_ops2, backend='dask')

>>> # wrap them in delayed to combine them into the same computation
>>> from dask import delayed
>>> dy = delayed([dy1, dy2])
>>> dy
Delayed('list-3af82335-b75e-47d6-b800-68490fc865fd')

As suggested by the name Delayed, we have a placeholder for the result so far. When we want to perform the computation we can call:

>>> dy.compute()
[114.78314052155015, -75.55902750513113]

The above matches the canonical numpy result. The computation can even be handled by various schedulers - see scheduling. Finally, to check we are reusing intermediaries, we can view the task graph generated for the computation:

>>> dy.visualize(optimize_graph=True)
_images/ex_dask_reuse_graph.png

Note

For sharing intermediates with other backends see Sharing Intermediates. Dask graphs are particularly useful for reusing intermediates beyond just contractions and can allow additional parallelization.