Dask Collections: Arrays#
Tip
We recommend the official dask documentation for the interested user via https://docs.dask.org/en/stable/
Dask Collections consist of implementations for
Array (xarray, numpy)
DataFrame (tables, pandas)
Bag (low level dask)
Delayed (low level dask)
Futures (need a client)
This notebook
only covers the dask array. A dask array can be interpreted as a collection of numpy arrays.
works well on the python3/unstable kernel on DKRZ’s Jupyterhub.
tries to calculate and plot quantiles of a large ensemble dataset
Dask arrays feature without distributed:
Parallel and larger-than-memory processing:
Only process one piece of the array at once - the rest is not loaded into memory yet
Automatic parallelization over parts of the dask array - multithreading per default
Lazy: Arrays are computed only when knowing the individual values is absolutely necessary. Until then, a Task Graph is filled with to-be-computed tasks.
We will work with the following packages:
[1]:
import xarray as xr
from IPython.display import Image
import dask
from dask.diagnostics import ProgressBar
import hvplot.xarray
import numcodecs
from distributed import Client
import intake
A scheme of data processing within dask (by Anaconda):
[2]:
Image(url="https://docs.dask.org/en/stable/_images/dask-overview.svg")
[2]:
Arrays#
Opening data with xarray creates dask arrays per default if you use
. Xarray will also use dask as the backend for arrays if you specify the chunks
keyword argument. You can specify
chunks="auto"
so that xarray returns a dask array well-configured for common data analysis.
In the following example, we work with the CMIP6 MPI Grand Ensemble. We use an intake catalog to get the data and select the air temperature ‘ta’:
[3]:
varname = "ta"
catalog = (
"/work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/main.yaml"
)
cat = intake.open_catalog(catalog)
ds = cat["CFday"].to_dask()[[varname]]
This is what intake does under the hood i.e. the next cell returns the same ds as before:
[4]:
%%capture
dsx = xr.open_zarr(
f"reference:://work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/CFday_{varname}_combined.parq",
storage_options=dict(
lazy=True,
remote_protocol="file",
),
consolidated=False,
)[[varname]]
Larger than memory
The dataset has a size of about 10TB which exceeds our available memory but we are still able to open and work with it.
[5]:
ds.nbytes / 1024**4
[5]:
9.686476768772991
One can access the array of a dataset variable with .data
, e.g.:
[4]:
type(ds[varname].data)
[4]:
dask.array.core.Array
[5]:
ds[varname].data
[5]:
|
We can see that one chunk has a size of 3.3MB. Dask usually operates on a chunk-by-chunk basis, thus:
Important
We need to ensure that all chunks immediately participating in a computation fit into memory. A good rule of thumb is that you may want to have enough memory for about 10 - 20 chunks per CPU core.
Coordinates of a dataset are usually loaded into memory so that these become numpy arrays:
[6]:
type(ds["lat"].data)
[6]:
numpy.ndarray
[7]:
ds["lat"].data
[7]:
array([-88.57216851, -86.72253095, -84.86197029, -82.99894164,
-81.13497684, -79.27055903, -77.40588808, -75.54106145,
-73.67613231, -71.81113211, -69.94608065, -68.08099099,
-66.21587211, -64.35073041, -62.48557052, -60.62039593,
-58.75520927, -56.8900126 , -55.02480754, -53.15959537,
-51.29437714, -49.4291537 , -47.56392575, -45.69869388,
-43.83345858, -41.96822027, -40.1029793 , -38.23773599,
-36.37249059, -34.50724334, -32.64199444, -30.77674406,
-28.91149237, -27.0462395 , -25.18098558, -23.31573073,
-21.45047504, -19.58521861, -17.71996153, -15.85470387,
-13.98944571, -12.12418712, -10.25892817, -8.39366891,
-6.5284094 , -4.66314971, -2.79788988, -0.93262997,
0.93262997, 2.79788988, 4.66314971, 6.5284094 ,
8.39366891, 10.25892817, 12.12418712, 13.98944571,
15.85470387, 17.71996153, 19.58521861, 21.45047504,
23.31573073, 25.18098558, 27.0462395 , 28.91149237,
30.77674406, 32.64199444, 34.50724334, 36.37249059,
38.23773599, 40.1029793 , 41.96822027, 43.83345858,
45.69869388, 47.56392575, 49.4291537 , 51.29437714,
53.15959537, 55.02480754, 56.8900126 , 58.75520927,
60.62039593, 62.48557052, 64.35073041, 66.21587211,
68.08099099, 69.94608065, 71.81113211, 73.67613231,
75.54106145, 77.40588808, 79.27055903, 81.13497684,
82.99894164, 84.86197029, 86.72253095, 88.57216851])
We convert dask arrays into numpy arrays by loading them.
Forcing data into memory#
While usually, libraries will figure out themselves when it’s necessary to load data into memory, there may be situations in which you manually want to trigger a computation and to keep data in memory (e.g. when you know you’ll need to access the same data many times and computing them at once is cheaper than to recompute it on and on again).
xarray
provides the following options to force data into memory:
.compute()
will convert a Dataset (or DataArray) backed by adask
array into one backed by anumpy
array, thus will compute and load data into memory (of your current python interpreter)..load()
will do the same as.compute()
but will also modify the input Dataset in-place..persist()
will keep the data as adask
array, but will replace the underlying task graph by an already computed in-memory representation of the array. If you usedask
with adistributed
cluster, the data may reside in any of the memories of your cluster.
So all of these methods bring data of the dask array into physical memory, data is not lazy anymore and fills the memory space. We therefore need to be careful, what we actually load, because we otherwise get a memory overflow. We should only compute data if we really need it in the working environment.
There’s one more shortcut: .values
, which is basically the same as .compute().data
:
[8]:
ds[varname][0, 0, 0, 0, 0].values
[8]:
array(245.68579102)
[9]:
ds[varname][0, 0, 0, 0, 0].data
[9]:
|
When to NOT use dask#
Random access i.e. fine subsetting
Opening data with Xarray can be lazy without dask. If the data of interest fits into memory, you do not necessarily need dask. You can open the dataset with chunks=None
and get a lazy representation based on xarray. Note that you are working with numpy arrays in the following.
[10]:
%%capture
varname = "ta"
dsxarraylazy = xr.open_zarr(
f"reference:://work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/CFday_{varname}_combined.parq",
storage_options=dict(
lazy=True,
remote_protocol="file",
),
consolidated=False,
chunks=None,
)
[11]:
dsxarraylazy
[11]:
<xarray.Dataset> Dimensions: (ensemble_member: 51, time: 60265, lev: 47, bnds: 2, lat: 96, lon: 192) Coordinates: * ensemble_member (ensemble_member) object 'r10i1p1f1' ... 'r9i1p1f1' * lat (lat) float64 -88.57 -86.72 -84.86 ... 84.86 86.72 88.57 * lev (lev) float64 0.9961 0.9826 0.959 ... 4.225e-05 9.816e-06 * lon (lon) float64 0.0 1.875 3.75 5.625 ... 354.4 356.2 358.1 * time (time) datetime64[ns] 1850-01-01T12:00:00 ... 2014-12-31... Dimensions without coordinates: bnds Data variables: ap (ensemble_member, time, lev) float64 ... ap_bnds (time, lev, bnds) float64 ... b (ensemble_member, time, lev) float64 ... b_bnds (time, lev, bnds) float64 ... lat_bnds (time, lat, bnds) float64 ... lev_bnds (time, lev, bnds) float64 ... lon_bnds (time, lon, bnds) float64 ... ps (ensemble_member, time, lat, lon) float32 ... ta (ensemble_member, time, lev, lat, lon) float32 ... time_bnds (time, bnds) datetime64[ns] ... Attributes: (12/47) Conventions: CF-1.7 CMIP-6.2 activity_id: CMIP branch_method: standard branch_time_in_child: 0.0 branch_time_in_parent: 0.0 cmor_version: 3.5.0 ... ... table_id: CFday table_info: Creation Date:(09 May 2019) MD5:e6ef8ececc8f33864... title: MPI-ESM1-2-LR output prepared for CMIP6 tracking_id: hdl:21.14100/58c496bc-8e9d-47cb-bfd7-9b28d399e816 variable_id: ta variant_label: r1i1p1f1
[12]:
dsxarraylazy[varname].isel(time=0, ensemble_member=0, lev=0).data
[12]:
array([[245.68579, 245.64478, 245.60571, ..., 245.79517, 245.76001,
245.7229 ],
[247.55103, 247.5315 , 247.51782, ..., 247.6565 , 247.61157,
247.57642],
[248.30298, 248.41821, 248.54517, ..., 248.0979 , 248.13696,
248.20728],
...,
[241.01587, 241.40063, 241.7893 , ..., 239.9397 , 240.27954,
240.63892],
[242.66235, 242.79321, 242.92993, ..., 242.30688, 242.41626,
242.5354 ],
[245.23657, 245.23462, 245.23462, ..., 245.2522 , 245.24634,
245.24048]], dtype=float32)
Task Graph#
All xarray functions that can be interpreted lazily by dask will be added to dask´s task graph.
This task graph is only evaluated if needed.
[13]:
ds[varname].data.dask
[13]:
HighLevelGraph
HighLevelGraph with 2 layers and 3073516 keys from all layers.
Layer1: original
original-open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
|
Layer2: open_dataset
open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
|
[14]:
# 1. Only interested in first level
ds_firstlevel = ds[[varname]].isel(lev=0)
ds_firstlevel[varname].data.dask
[14]:
HighLevelGraph
HighLevelGraph with 3 layers and 6147031 keys from all layers.
Layer1: original
original-open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
|
Layer2: open_dataset
open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
|
Layer3: getitem
getitem-b152f8d0f6e17c47d63d2b4f78a1252d
|
For large data sets like this one, the task graph is huge because of the O(Million) chunks in the Array. So while we are creating our workflow within this task graph, we have to think about how we can do that in the optimal manner.
Optimized opening#
We can already open the dataset with chunks that fit to our analysis purpose. As we assume that we are
are only interested in one level
want to generate quantiles over the entire ensemble_member
, we can set the following chunks in the open_
function of xarray:
[15]:
%%capture
varname = "ta"
ds = xr.open_zarr(
f"reference:://work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/CFday_{varname}_combined.parq",
storage_options=dict(
lazy=True,
remote_protocol="file",
),
consolidated=False,
chunks=dict(ensemble_member=51, lev=1),
)
[16]:
# 1. Only interested in first level
ds_firstlevel = ds[[varname]].isel(lev=0)
ds_firstlevel[varname].data.dask
[16]:
HighLevelGraph
HighLevelGraph with 3 layers and 2892721 keys from all layers.
Layer1: original
original-open_dataset-746045b6ab62b368add198c45660e89cta-350432e22e272d100821c09776798208
|
Layer2: open_dataset
open_dataset-746045b6ab62b368add198c45660e89cta-350432e22e272d100821c09776798208
|
Layer3: getitem
getitem-8bb71842dd54b0d0aac0e83dff0ccd61
|
We end up with a third of the toal number of keys of the first approach.
We can further use the optimize
function of dask which applies different optimization routines on the task graph:
[17]:
ds_firstlevel_optimized = dask.optimize(ds_firstlevel)
ds_firstlevel_optimized = ds_firstlevel_optimized[0]
ds_firstlevel_optimized[varname].data.dask
[17]:
HighLevelGraph
HighLevelGraph with 1 layers and 120531 keys from all layers.
Layer1: getitem
getitem-8bb71842dd54b0d0aac0e83dff0ccd61
|
Which functions are evaluated lazily?#
Many of the numpy functions:
[18]:
daskmethods = dir(ds_firstlevel_optimized[varname].data)
numpymethods = dir(ds_firstlevel_optimized["lat"].data)
print([a for a in numpymethods if a in daskmethods])
['T', '__abs__', '__add__', '__and__', '__array__', '__array_function__', '__array_priority__', '__array_ufunc__', '__bool__', '__class__', '__complex__', '__deepcopy__', '__delattr__', '__dir__', '__divmod__', '__doc__', '__eq__', '__float__', '__floordiv__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__index__', '__init__', '__init_subclass__', '__int__', '__invert__', '__iter__', '__le__', '__len__', '__lshift__', '__lt__', '__matmul__', '__mod__', '__mul__', '__ne__', '__neg__', '__new__', '__or__', '__pos__', '__pow__', '__radd__', '__rand__', '__rdivmod__', '__reduce__', '__reduce_ex__', '__repr__', '__rfloordiv__', '__rlshift__', '__rmatmul__', '__rmod__', '__rmul__', '__ror__', '__rpow__', '__rrshift__', '__rshift__', '__rsub__', '__rtruediv__', '__rxor__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__sub__', '__subclasshook__', '__truediv__', '__xor__', 'all', 'any', 'argmax', 'argmin', 'astype', 'choose', 'clip', 'conj', 'copy', 'cumprod', 'cumsum', 'dot', 'dtype', 'flatten', 'imag', 'itemsize', 'max', 'mean', 'min', 'nbytes', 'ndim', 'nonzero', 'prod', 'ravel', 'real', 'repeat', 'reshape', 'round', 'shape', 'size', 'squeeze', 'std', 'sum', 'swapaxes', 'trace', 'transpose', 'var', 'view']
or not evaluated lazily?
[19]:
print([a for a in numpymethods if not a in daskmethods])
['__array_finalize__', '__array_interface__', '__array_prepare__', '__array_struct__', '__array_wrap__', '__class_getitem__', '__contains__', '__copy__', '__delitem__', '__dlpack__', '__dlpack_device__', '__iadd__', '__iand__', '__ifloordiv__', '__ilshift__', '__imatmul__', '__imod__', '__imul__', '__ior__', '__ipow__', '__irshift__', '__isub__', '__itruediv__', '__ixor__', '__setstate__', 'argpartition', 'argsort', 'base', 'byteswap', 'compress', 'conjugate', 'ctypes', 'data', 'diagonal', 'dump', 'dumps', 'fill', 'flags', 'flat', 'getfield', 'item', 'itemset', 'newbyteorder', 'partition', 'ptp', 'put', 'resize', 'searchsorted', 'setfield', 'setflags', 'sort', 'strides', 'take', 'tobytes', 'tofile', 'tolist', 'tostring']
[20]:
# 2.Calculate a field mean
ds_fieldmean = ds_firstlevel_optimized[[varname]].mean(dim=["lat", "lon"])
ds_fieldmean[varname].data.dask
[20]:
HighLevelGraph
HighLevelGraph with 3 layers and 241061 keys from all layers.
Layer1: getitem
getitem-8bb71842dd54b0d0aac0e83dff0ccd61
|
Layer2: mean_chunk
mean_chunk-38f0dfc3230035e469b89cc8175f5473
|
Layer3: mean_agg-aggregate
mean_agg-aggregate-036593c2734e4aa9fbee4773a240d2d9
|
[21]:
ds_fieldmean[varname].data
[21]:
|
Xarray’s lazy functions:#
E.g. groupby operation is lazy:
[22]:
# 3. Yearly means:
ds_fieldmean_ym = ds_fieldmean.groupby("time.year").mean()
ds_fieldmean_ym
[22]:
<xarray.Dataset> Dimensions: (year: 165, ensemble_member: 51) Coordinates: * ensemble_member (ensemble_member) object 'r10i1p1f1' ... 'r9i1p1f1' lev float64 0.9961 * year (year) int64 1850 1851 1852 1853 ... 2011 2012 2013 2014 Data variables: ta (year, ensemble_member) float32 dask.array<chunksize=(1, 51), meta=np.ndarray>
We now got
Dask graph 165 chunks in 1324 graph layers
so that displaying the underlying task graph would end in troubles so we better not do it.
Make custom numpy operations lazy#
If we have a package or function that works only on numpy arrays, we can use apply_ufunc
or map_blocks
to apply it on the dask array lazily. This is rather easy if the shape of the array does not change. If a reduction takes place, it becomes more and more complex and more kwargs has to be provided.
One simple example would be lossy compression. We can lossy compress all chunks with numcodecs by using only 12 mantissa bits:
[23]:
def compress_data(partds):
import numcodecs
rounding = numcodecs.BitRound(keepbits=12)
return rounding.decode(rounding.encode(partds))
[24]:
ds_fieldmean_ym_lossy = xr.apply_ufunc(
compress_data, ds_fieldmean_ym, dask="parallelized"
)
We now continue with creating quantiles. We could do that in one step however we would like to save each quantile as a new data array in the data set so we use a for loop:
[25]:
# 3. Quantiles:
quantiles = [0.01, 0.25, 0.5, 0.75, 0.99]
quantiles_str = [str(a) for a in quantiles]
for sq, q in zip(quantiles_str, quantiles):
ds_fieldmean_ym[sq] = ds_fieldmean_ym[varname].quantile(q, dim="ensemble_member")
quantiles = ds_fieldmean_ym[quantiles_str]
Rechunking#
In general, you should already open the data with the chunk setting you need for your analysis.
Some functions like .quantile
require that their core dimension is exactly one chunk of data. If you are in that situation, you can rechunk the data with .chunk
to what you need.
Only a xarray dataset configured as follows can be used for .quantile
:
[26]:
ds_fieldmean_ym_rechunked = ds_fieldmean_ym.chunk(ensemble_member=51)
Schedule#
We finally reached the end point of our workflow. We can see that there are still a lot of task layers. This is probably because the years are calculated after each other.
We can check if dask.optimize
can fuse these tasks.
[27]:
quantiles_optimized = dask.optimize(quantiles)
quantiles_optimized = quantiles_optimized[0]
quantiles_optimized
[27]:
<xarray.Dataset> Dimensions: (year: 165) Coordinates: lev float64 0.9961 * year (year) int64 1850 1851 1852 1853 1854 ... 2010 2011 2012 2013 2014 quantile float64 0.01 Data variables: 0.01 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.25 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.5 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.75 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.99 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
To test, whether our workflow can be computed in a sufficient amount of time, we can compute a small subset like a year.
Note that there is also the ProgressBar
which gives us updates on the status of the calculations:
[28]:
quantiles_test = quantiles_optimized.isel(year=slice(0, 2))
quantiles_test
[28]:
<xarray.Dataset> Dimensions: (year: 2) Coordinates: lev float64 0.9961 * year (year) int64 1850 1851 quantile float64 0.01 Data variables: 0.01 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.25 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.5 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.75 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray> 0.99 (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
[39]:
with ProgressBar():
quantiles_test = quantiles_test.compute()
[########################################] | 100% Completed | 115.12 s
We can see that our two years take 100s to be finalized. Extrapolation gives 5000s / 60s = 100min for the entire array.
At this stage, we could set-up a cluster with a client (see distributed) to submit the work to a distributed system.
[42]:
client = Client()
client
[42]:
Client
Client-9a26239b-d571-11ee-b086-080038c049a7
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: /proxy/8787/status |
Cluster Info
LocalCluster
ff412f23
Dashboard: /proxy/8787/status | Workers: 16 |
Total threads: 256 | Total memory: 250.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d221141a-25c0-4152-9e3e-482c4714bc58
Comm: tcp://127.0.0.1:38411 | Workers: 16 |
Dashboard: /proxy/8787/status | Total threads: 256 |
Started: Just now | Total memory: 250.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:38665 | Total threads: 16 |
Dashboard: /proxy/44003/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:45101 | |
Local directory: /tmp/dask-worker-space/worker-nlpz9j7j |
Worker: 1
Comm: tcp://127.0.0.1:38815 | Total threads: 16 |
Dashboard: /proxy/46837/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:38891 | |
Local directory: /tmp/dask-worker-space/worker-yoxcxhpi |
Worker: 2
Comm: tcp://127.0.0.1:38813 | Total threads: 16 |
Dashboard: /proxy/41717/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:36169 | |
Local directory: /tmp/dask-worker-space/worker-0bfkrbwy |
Worker: 3
Comm: tcp://127.0.0.1:34909 | Total threads: 16 |
Dashboard: /proxy/41409/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:39835 | |
Local directory: /tmp/dask-worker-space/worker-gtaomdii |
Worker: 4
Comm: tcp://127.0.0.1:40821 | Total threads: 16 |
Dashboard: /proxy/36367/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:46177 | |
Local directory: /tmp/dask-worker-space/worker-snfavmt6 |
Worker: 5
Comm: tcp://127.0.0.1:38237 | Total threads: 16 |
Dashboard: /proxy/44247/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:44397 | |
Local directory: /tmp/dask-worker-space/worker-n_pfpoi0 |
Worker: 6
Comm: tcp://127.0.0.1:34303 | Total threads: 16 |
Dashboard: /proxy/37329/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:39511 | |
Local directory: /tmp/dask-worker-space/worker-nnb21vzj |
Worker: 7
Comm: tcp://127.0.0.1:44227 | Total threads: 16 |
Dashboard: /proxy/37195/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:40885 | |
Local directory: /tmp/dask-worker-space/worker-x510oj47 |
Worker: 8
Comm: tcp://127.0.0.1:42987 | Total threads: 16 |
Dashboard: /proxy/43029/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:40125 | |
Local directory: /tmp/dask-worker-space/worker-yfsj6n6s |
Worker: 9
Comm: tcp://127.0.0.1:33031 | Total threads: 16 |
Dashboard: /proxy/45403/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:39413 | |
Local directory: /tmp/dask-worker-space/worker-okd_ptwh |
Worker: 10
Comm: tcp://127.0.0.1:33553 | Total threads: 16 |
Dashboard: /proxy/42029/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:43689 | |
Local directory: /tmp/dask-worker-space/worker-vayx8uut |
Worker: 11
Comm: tcp://127.0.0.1:38473 | Total threads: 16 |
Dashboard: /proxy/34477/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:37543 | |
Local directory: /tmp/dask-worker-space/worker-qmmqc6va |
Worker: 12
Comm: tcp://127.0.0.1:42785 | Total threads: 16 |
Dashboard: /proxy/36345/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:38021 | |
Local directory: /tmp/dask-worker-space/worker-5i9vmja3 |
Worker: 13
Comm: tcp://127.0.0.1:43479 | Total threads: 16 |
Dashboard: /proxy/45247/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:43179 | |
Local directory: /tmp/dask-worker-space/worker-3ifwui_3 |
Worker: 14
Comm: tcp://127.0.0.1:37347 | Total threads: 16 |
Dashboard: /proxy/39735/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:35403 | |
Local directory: /tmp/dask-worker-space/worker-u96onkx_ |
Worker: 15
Comm: tcp://127.0.0.1:41391 | Total threads: 16 |
Dashboard: /proxy/34199/status | Memory: 15.62 GiB |
Nanny: tcp://127.0.0.1:38043 | |
Local directory: /tmp/dask-worker-space/worker-u0nh5nfb |
[ ]:
quantiles_optimized = quantiles_optimized.compute()
Plot#
To plot the quantiles, we use hvplot.xarray and the following combinations of area and line plots:
[40]:
def plot_quantiles_area(quantiles, ylabel):
ninetyeight = quantiles.hvplot.area(
grid=True,
y="0.01",
y2="0.99",
width=820,
color="aliceblue",
label="98% of all values",
)
fifty = quantiles.hvplot.area(
grid=True,
y="0.25",
y2="0.75",
width=820,
color="aqua",
label="50% of all values",
)
median = quantiles.hvplot.line(
grid=True, y="0.5", color="cornflowerblue", label="median"
)
comb = (ninetyeight * fifty * median).opts(
ylabel=ylabel, legend_position="bottom_right"
)
return comb
[44]:
plot_quantiles_area(quantiles_optimized, varname)
[44]:
[ ]: