Dask chunks and best practices#

This notebook is meant to be used in DKRZ’s Jupyterhub with the python3 unstable kernel.

We aim to understand

  • the different types of chunks, especially

    • storage chunks

      • The smallest unit of binary data that is stored within a climate data file (zarr, netcdf, grib)

    • dask chunks

      • The chunks that dask uses inside a dask array. The smallest unit for which dask executes a workflow at once.

  • the best practices recommended by dask.

We use the following packages which needs to be in your environment/kernel:

[1]:
import intake
import xarray as xr
import hvplot.xarray
import numpy as np
from xhistogram.xarray import histogram
import dask
from dask.distributed import performance_report
from distributed import Client
import time
import glob

We mainly look at high frequency (30minutes) high resolution (zoom level 10 ICON data from NextGEMS Cycle 3 and select precipitation as an example variable.

We get the data from an intake catalog:

[2]:
CATALOG_LINK = "https://data.nextgems-h2020.eu/catalog.yaml"
VARID = "pr"
[3]:
selection = dict(zoom=10, time="PT30M")
[4]:
cat = intake.open_catalog(CATALOG_LINK)
cat
data_nextgems-h2020_eu:
  args:
    path: https://data.nextgems-h2020.eu/catalog.yaml
  description: ''
  driver: intake.catalog.local.YAMLFileCatalog
  metadata: {}

Chunks#

TLDR,

Dask chunks need to

  • be smaller than your memory

Dask chunks should

  • be about 100MB

  • not exceed about O(1 mio) in total when sent to the scheduler i.e. when computed

  • reflect a multiple of the storage chunks

  • not cut storage chunks

  • fit to your analysis

with open_zarr and intake configurations#

In the following, we have a look on the NGC3 zarr data and open it in 3 different ways:

  1. The catalog is configured such that xarray opens data lazily and encoded as numpy arrays (no dask)

[5]:
ds_delayed = cat["ICON"]["ngc3028"](**selection).to_dask()[VARID]
ds_delayed
[5]:
<xarray.DataArray 'pr' (time: 96480, cell: 12582912)>
[1213999349760 values with dtype=float32]
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

The same as:

[6]:
ds_delayed = xr.open_dataset(
    f"/fastdata/ka1081/nextgems_cycle3/ngc3028/ngc3028_{selection['time']}_{str(selection['zoom'])}.zarr",
    engine="zarr",
    chunks=None,
)[VARID]
  1. Let dask create dask chunks when opening with chunks="auto":

[7]:
ds_auto = cat["ICON"]["ngc3028"](**selection, chunks="auto").to_dask()[VARID]
ds_auto
[7]:
<xarray.DataArray 'pr' (time: 96480, cell: 12582912)>
dask.array<open_dataset-dd9675028d7e43a50230dc83a922860apr, shape=(96480, 12582912), dtype=float32, chunksize=(32, 524288), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

The same as:

[8]:
ds_auto = xr.open_dataset(
    f"/fastdata/ka1081/nextgems_cycle3/ngc3028/ngc3028_{selection['time']}_{str(selection['zoom'])}.zarr",
    engine="zarr",
    chunks="auto",
)[VARID]

But why is the .encoding["chunks"] different to the dask chunks?

[9]:
ds_auto.encoding["chunks"]
[9]:
(16, 262144)

Because the latter are storage chunks. It is the shape of a chunk written to the storage.

Storage chunks have a different optimal size than dask chunks. Each storage chunk can be compressed individually. Therefore, it depends a lot on the chosen compression tool what the optimal size of a storage chunk is. Also, optimal access to the storage backend hardware affects the optimal storage chunk size.

In general, one can say that 1-10MB is a good storage chunk size for disk/lustre and <100MB is still OK for most backends.

  1. Force dask to use storage chunks when opening with chunks={}:

[10]:
ds_storage = cat["ICON"]["ngc3028"](**selection, chunks={}).to_dask()[VARID]
ds_storage
[10]:
<xarray.DataArray 'pr' (time: 96480, cell: 12582912)>
dask.array<open_dataset-11ac8388882b0270658ade8f13719e04pr, shape=(96480, 12582912), dtype=float32, chunksize=(16, 262144), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

The same as:

[11]:
ds_storage = xr.open_dataset(
    f"/fastdata/ka1081/nextgems_cycle3/ngc3028/ngc3028_{selection['time']}_{str(selection['zoom'])}.zarr",
    engine="zarr",
    chunks={},
)[VARID]

We look at a

  • 5TB array

  • which is chunked in all dimensions both in storage chunks and dask chunks

  • and results in a dask graph with O(100,000) chunks

Note

To load one individual dask chunk is one task in the dask’s task graph.

When we compare chunks of ds_auto and ds_storage, we find that

  • Storage chunks of precipitation are small.

  • Dask aims to create chunks with about 100MB in size

  • Dask is clever to use a multiple of the storage chunk size:

[12]:
print(ds_auto.chunksizes["time"][0] / ds_storage.chunksizes["time"][0])
print(ds_auto.chunksizes["cell"][0] / ds_storage.chunksizes["cell"][0])
2.0
2.0

That means, we have 2²=4 storage chunks within one dask chunk.

Dask is able to load these storage chunks at once.

Note

NOTE: chunks("auto") returns well-configured dask chunks for general analysis purpose so that a chunk has about 100MB.

Config and default setting#

Sometimes 100MB memory is not the best option for a general approach. You can change this dask setting with:

[13]:
# dask.config.set({"array.slicing.split_large_chunks": False})
# dask.config.set({"array.chunk-size": "10 MB"})
[14]:
# find out the target dask chunk size:
dask.config.config["array"]["chunk-size"]
[14]:
'128MiB'
[15]:
# set the target dask chunk size:
dask.config.set({"array.chunk-size": "10 MB"})
[15]:
<dask.config.set at 0x7fff7ccdb2e0>

If we now open the data with chunks="auto", we get:

[16]:
ds_auto_config = xr.open_dataset(
    f"/fastdata/ka1081/nextgems_cycle3/ngc3028/ngc3028_{selection['time']}_{str(selection['zoom'])}.zarr",
    engine="zarr",
    chunks="auto",
)[VARID]
ds_auto_config
/sw/spack-levante/mambaforge-23.1.0-1-Linux-x86_64-3boc6i/lib/python3.10/site-packages/xarray/core/dataset.py:255: UserWarning: The specified Dask chunks separate the stored chunks along dimension "time" starting at index 12. This could degrade performance. Instead, consider rechunking after loading.
  warnings.warn(
/sw/spack-levante/mambaforge-23.1.0-1-Linux-x86_64-3boc6i/lib/python3.10/site-packages/xarray/core/dataset.py:255: UserWarning: The specified Dask chunks separate the stored chunks along dimension "cell" starting at index 208332. This could degrade performance. Instead, consider rechunking after loading.
  warnings.warn(
[16]:
<xarray.DataArray 'pr' (time: 96480, cell: 12582912)>
dask.array<open_dataset-1a14ada58f9017b077d18ef7caddcac1pr, shape=(96480, 12582912), dtype=float32, chunksize=(12, 208332), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface
[17]:
# set the target dask chunk size:
dask.config.set({"array.chunk-size": "128 MB"})
[17]:
<dask.config.set at 0x7fff7c69cd00>

open_mfdataset on netcdf#

It is getting even more complicated. open_mfdataset behaves differently!

Note

Using chunks=auto does not always lead to dask chunks being a multiple of storage chunks

Consider this example:

[18]:
varname = "ta"
filelist = glob.glob(
    f"/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp126/r1i1p1f1/CFday/{varname}/gn/v20190710/*.nc"
)
[19]:
openmf_auto = xr.open_mfdataset(filelist, chunks="auto")
openmf_auto
[19]:
<xarray.Dataset>
Dimensions:    (time: 31411, bnds: 2, lev: 95, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
  * lev        (lev) float64 0.9961 0.9826 0.959 ... 2.31e-05 9.816e-06
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(1826, 2), meta=np.ndarray>
    lev_bnds   (time, lev, bnds) float64 dask.array<chunksize=(1826, 95, 2), meta=np.ndarray>
    ap         (time, lev) float64 dask.array<chunksize=(1826, 95), meta=np.ndarray>
    b          (time, lev) float64 dask.array<chunksize=(1826, 95), meta=np.ndarray>
    ps         (time, lat, lon) float32 dask.array<chunksize=(1142, 118, 237), meta=np.ndarray>
    ap_bnds    (time, lev, bnds) float64 dask.array<chunksize=(1826, 95, 2), meta=np.ndarray>
    b_bnds     (time, lev, bnds) float64 dask.array<chunksize=(1826, 95, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(1826, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(1826, 384, 2), meta=np.ndarray>
    ta         (time, lev, lat, lon) float32 dask.array<chunksize=(423, 21, 42, 85), meta=np.ndarray>
Attributes: (12/47)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            ScenarioMIP
    branch_method:          standard
    branch_time_in_child:   60265.0
    branch_time_in_parent:  60265.0
    contact:                cmip6-mpi-esm@dkrz.de
    ...                     ...
    title:                  MPI-ESM1-2-HR output prepared for CMIP6
    variable_id:            ta
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by DKRZ is licensed und...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/c30b8347-4287-4f20-ab6f-d8ddb1198635

This configuration gives us chunks which split the storage chunks.

[20]:
storage_chunks = openmf_auto[varname].encoding["chunksizes"]
storage_chunks
[20]:
(1, 48, 96, 192)

This is the second worst configuration you can have in dask. On top of that, chunks={} does not gives us storage chunks either but one file per chunk:

[21]:
one_file_per_chunk = xr.open_mfdataset(
    "/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp126/r1i1p1f1/CFday/ta/gn/v20190710/*.nc",
    chunks={},
)
one_file_per_chunk
[21]:
<xarray.Dataset>
Dimensions:    (time: 31411, bnds: 2, lev: 95, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
  * lev        (lev) float64 0.9961 0.9826 0.959 ... 2.31e-05 9.816e-06
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(1826, 2), meta=np.ndarray>
    lev_bnds   (time, lev, bnds) float64 dask.array<chunksize=(1826, 95, 2), meta=np.ndarray>
    ap         (time, lev) float64 dask.array<chunksize=(1826, 95), meta=np.ndarray>
    b          (time, lev) float64 dask.array<chunksize=(1826, 95), meta=np.ndarray>
    ps         (time, lat, lon) float32 dask.array<chunksize=(1826, 192, 384), meta=np.ndarray>
    ap_bnds    (time, lev, bnds) float64 dask.array<chunksize=(1826, 95, 2), meta=np.ndarray>
    b_bnds     (time, lev, bnds) float64 dask.array<chunksize=(1826, 95, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(1826, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(1826, 384, 2), meta=np.ndarray>
    ta         (time, lev, lat, lon) float32 dask.array<chunksize=(1826, 95, 192, 384), meta=np.ndarray>
Attributes: (12/47)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            ScenarioMIP
    branch_method:          standard
    branch_time_in_child:   60265.0
    branch_time_in_parent:  60265.0
    contact:                cmip6-mpi-esm@dkrz.de
    ...                     ...
    title:                  MPI-ESM1-2-HR output prepared for CMIP6
    variable_id:            ta
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by DKRZ is licensed und...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/c30b8347-4287-4f20-ab6f-d8ddb1198635

For this case, this is the worst configuration you can have in dask because the chunk size most likely exceeds the available memory so that you cannot really work with this dataset at all.

Tip

When using open_mfdataset, ensure that the chunks are well defined. Consider reopening with a multiple of the storage chunk sizes.

[22]:
open_chunks = dict(
    time=2,
    lev=storage_chunks[1],  # storage chunk levels,
    lon=len(one_file_per_chunk["lon"]),
    lat=len(one_file_per_chunk["lat"]),
)
multiple_storage_chunks = xr.open_mfdataset(
    "/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp126/r1i1p1f1/CFday/ta/gn/v20190710/*.nc",
    chunks=open_chunks,  # or a multiple storage_chunks
)
multiple_storage_chunks
[22]:
<xarray.Dataset>
Dimensions:    (time: 31411, bnds: 2, lev: 95, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
  * lev        (lev) float64 0.9961 0.9826 0.959 ... 2.31e-05 9.816e-06
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(2, 2), meta=np.ndarray>
    lev_bnds   (time, lev, bnds) float64 dask.array<chunksize=(1826, 48, 2), meta=np.ndarray>
    ap         (time, lev) float64 dask.array<chunksize=(1826, 48), meta=np.ndarray>
    b          (time, lev) float64 dask.array<chunksize=(1826, 48), meta=np.ndarray>
    ps         (time, lat, lon) float32 dask.array<chunksize=(2, 192, 384), meta=np.ndarray>
    ap_bnds    (time, lev, bnds) float64 dask.array<chunksize=(1826, 48, 2), meta=np.ndarray>
    b_bnds     (time, lev, bnds) float64 dask.array<chunksize=(1826, 48, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(1826, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(1826, 384, 2), meta=np.ndarray>
    ta         (time, lev, lat, lon) float32 dask.array<chunksize=(2, 48, 192, 384), meta=np.ndarray>
Attributes: (12/47)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            ScenarioMIP
    branch_method:          standard
    branch_time_in_child:   60265.0
    branch_time_in_parent:  60265.0
    contact:                cmip6-mpi-esm@dkrz.de
    ...                     ...
    title:                  MPI-ESM1-2-HR output prepared for CMIP6
    variable_id:            ta
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by DKRZ is licensed und...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/c30b8347-4287-4f20-ab6f-d8ddb1198635

Best-practices#

In the following, we are running some example analysis to explore how dask works. For ds_auto, we want to know:

What is the quick and unweighted field mean of the first year?

[23]:
def calc_fieldmean_of_first_year(ds):
    dsy1 = list(dict(ds.groupby("time.year")).values())[0]
    dsy1mean = dsy1.mean(dim="cell")
    display(dsy1mean)
    gbs = dsy1.nbytes / 1024 / 1024 / 1024
    print(f"Going to process: {gbs} GBs")
    return dsy1mean.compute(), gbs

Do we need dask?#

[24]:
print(
    "The first year of the dataset is "
    f"{ds_delayed.isel(time=slice(0,2*24*365)).nbytes/1024/1024/1024}"
    " GBs large and does not fit into memory"
)
The first year of the dataset is 821.25 GBs large and does not fit into memory
[25]:
# ds_delayed.isel(time=slice(0,2*24*365)).mean(dim="time").compute()

Let’s quickly define a client to get a dashboard view. More on this in the distributed notebook.

[26]:
# client.close()
client = Client()

Diagnostic tools#

When we let dask actually work, we can use diagnostic-tools to inspect how our task-graph was evaluated from the client i.e. how performant the execution was. E.g., there is the performance_report. We use that in a with context as follows:

[27]:
s = time.time()
with performance_report(filename="dask-report-auto.html"):
    result, gbs = calc_fieldmean_of_first_year(ds_auto)
e = time.time()
print(f"Process speed: {gbs/(e-s)} GB/s")
<xarray.DataArray 'pr' (time: 16655)>
dask.array<mean_agg-aggregate, shape=(16655,), dtype=float32, chunksize=(32,), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2020-12-31T23:30:00
Going to process: 780.703125 GBs
Process speed: 6.697790692183614 GB/s

Lets compare this with the dataset with storage chunk dask chunks:

[28]:
s = time.time()
with performance_report(filename="dask-report-storage.html"):
    gbs = calc_fieldmean_of_first_year(ds_storage)[1]
e = time.time()
print(f"Process speed: {gbs/(e-s)} GB/s")
<xarray.DataArray 'pr' (time: 16655)>
dask.array<mean_agg-aggregate, shape=(16655,), dtype=float32, chunksize=(16,), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2020-12-31T23:30:00
Going to process: 780.703125 GBs
/sw/spack-levante/mambaforge-23.1.0-1-Linux-x86_64-3boc6i/lib/python3.10/site-packages/distributed/client.py:3106: UserWarning: Sending large graph of size 9.65 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
Process speed: 6.211303828719483 GB/s

We found that the field mean of the 780GBs of the first year was processed for

  • dask_auto:

    • in *521 chunks in 7 graph layers

    • with a speed of about 6.5GB/s (10GB/s can be considered max for a levante node)

  • dask_storage:

    • in *1041 chunks in 7 graph layers

    • with a speed of about 5.5GB/s

[29]:
result.hvplot.line()
[29]:

Adapt chunks to your analysis#

Now we ask

What is the histogram for the first 8000 cells?

The analysis goal has implications for our chunk setting. One dask chunk should contain

  • 8000 cells

  • as many timesteps as possible

Hoever, the general optimization rules for dask chunks setting say:

  • do not split up storage chunks

and

  • target 100MB dask chunks

So we can come up with sth like

[30]:
chunked_for_histogram = xr.open_dataset(
    f"/fastdata/ka1081/nextgems_cycle3/ngc3028/ngc3028_{selection['time']}_{str(selection['zoom'])}.zarr",
    engine="zarr",
    chunks=dict(time=16 * 8, cell=262144),
)[VARID]
chunked_for_histogram
[30]:
<xarray.DataArray 'pr' (time: 96480, cell: 12582912)>
dask.array<open_dataset-76e6ce1b4c49b5e250a66fdc64b44210pr, shape=(96480, 12582912), dtype=float32, chunksize=(128, 262144), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

Lets subselect

[31]:
dss = ds_auto.isel(cell=slice(0, 8000))
dss
[31]:
<xarray.DataArray 'pr' (time: 96480, cell: 8000)>
dask.array<getitem, shape=(96480, 8000), dtype=float32, chunksize=(32, 8000), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

Persist for more analysis, compute for results#

We can see that dss would fit into our memory. If we plan to run multiple different functions on it after loading, we can use .persist instead of compute. This keeps the data distributed across the workers so that a new dask job on the data is faster.

You can see in the dashboard that .persist will trigger some getitem() tasks.

[32]:
dss_persist = dss.persist()
dss_persist
[32]:
<xarray.DataArray 'pr' (time: 96480, cell: 8000)>
dask.array<getitem, shape=(96480, 8000), dtype=float32, chunksize=(32, 8000), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

dss_persist is still a dask array but a .load would return a numpy array very quickly by gathering the data from workers.

[33]:
dss_persist
[33]:
<xarray.DataArray 'pr' (time: 96480, cell: 8000)>
dask.array<getitem, shape=(96480, 8000), dtype=float32, chunksize=(32, 8000), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20T00:30:00 ... 2025-07-22
Dimensions without coordinates: cell
Attributes:
    cell_methods:  time: mean
    component:     atmo
    grid_mapping:  crs
    long_name:     precipitation flux
    units:         kg m-2 s-1
    vgrid:         surface

Multiple computes at once#

Now, we create the bins for our histogram. If we need to run .compute multiple times, we can run that together on the scheduler with dask.compute:

[34]:
# NOT:
# dsmin=dss.min().compute()
# dsmax=dss.max().compute()
# BETTER:
dsmin, dsmax = dask.compute(dss_persist.min(), dss_persist.max())
bins = np.linspace(dsmin, dsmax - (dsmax - dsmin) * 0.95, 20)

For the histogram we use xhistogram:

[35]:
def calc_histogram_for_fieldslice(dshist):
    h_x = histogram(dshist, bins=bins, dim=["time"])
    return h_x.compute(), gbs
[36]:
with performance_report(filename="dask-report-auto.html"):
    result_histogram, gbs = calc_histogram_for_fieldslice(dss_persist)
result_histogram
[36]:
<xarray.DataArray 'histogram_pr' (cell: 8000, pr_bin: 19)>
array([[95998,   159,    73, ...,     2,     5,     4],
       [96011,   161,    60, ...,     4,     3,     4],
       [96061,   121,    67, ...,     2,     4,     2],
       ...,
       [96346,    25,    12, ...,     1,     3,     1],
       [96341,    32,    17, ...,     1,     1,     1],
       [96353,    18,    18, ...,     2,     1,     1]])
Coordinates:
  * pr_bin   (pr_bin) float64 0.0001068 0.0003205 ... 0.003739 0.003953
Dimensions without coordinates: cell
[37]:
result_histogram.isel(cell=1, pr_bin=slice(1, 99)).hvplot.bar(x="pr_bin")
[37]:
[ ]: