Dask Collections: Arrays#

Tip

We recommend the official dask documentation for the interested user via https://docs.dask.org/en/stable/

Dask Collections consist of implementations for

  • Array (xarray, numpy)

  • DataFrame (tables, pandas)

  • Bag (low level dask)

  • Delayed (low level dask)

  • Futures (need a client)

This notebook

  1. only covers the dask array. A dask array can be interpreted as a collection of numpy arrays.

  2. works well on the python3/unstable kernel on DKRZ’s Jupyterhub.

  3. tries to calculate and plot quantiles of a large ensemble dataset

Dask arrays feature without distributed:

  • Parallel and larger-than-memory processing:

    • Only process one piece of the array at once - the rest is not loaded into memory yet

    • Automatic parallelization over parts of the dask array - multithreading per default

  • Lazy: Arrays are computed only when requested. High level software like xarray anticipates when it is rqeuired to evaluate an array. Until then, a Task Graph is filled.

We will work with the following packages:

[1]:
import xarray as xr
from IPython.display import Image
import dask
from dask.diagnostics import ProgressBar
import hvplot.xarray
import numcodecs
from distributed import Client
import intake

A scheme of data processing within dask (by Anaconda):

[2]:
Image(url="https://docs.dask.org/en/stable/_images/dask-overview.svg")
[2]:

Arrays#

Opening data with xarray creates dask arrays per default if you use

. Xarray will also use dask as the backend for arrays if you specify the chunks keyword argument. You can specify

  • chunks=”auto”

so that xarray returns a dask array well-configured for common data analysis.

In the following example, we work with the CMIP6 MPI Grand Ensemble. We use an intake catalog to get the data and select the air temperature ‘ta’:

[3]:
varname = "ta"
catalog = (
    "/work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/main.yaml"
)
cat = intake.open_catalog(catalog)
ds = cat["CFday"].to_dask()[[varname]]

This is what intake does under the hood i.e. the next cell returns the same ds as before:

[4]:
%%capture
dsx = xr.open_zarr(
    f"reference:://work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/CFday_{varname}_combined.parq",
    storage_options=dict(
        lazy=True,
        remote_protocol="file",
    ),
    consolidated=False,
)[[varname]]

Larger than memory

The dataset has a size of about 10TB which exceeds our available memory but we are still able to open and work with it.

[5]:
ds.nbytes / 1024**4
[5]:
9.686476768772991

One can access the array of a dataset variable with .data, e.g.:

[4]:
type(ds[varname].data)
[4]:
dask.array.core.Array
[5]:
ds[varname].data
[5]:
Array Chunk
Bytes 9.69 TiB 3.30 MiB
Shape (51, 60265, 47, 96, 192) (1, 1, 47, 96, 192)
Dask graph 3073515 chunks in 2 graph layers
Data type float32 numpy.ndarray
60265 51 192 96 47

We can see that one chunk has a size of 3.3MB.

Important

We need to ensure that a single chunk fits into memory.

Coordinates of a dataset are usually loaded into memory so that these become numpy arrays:

[6]:
type(ds["lat"].data)
[6]:
numpy.ndarray
[7]:
ds["lat"].data
[7]:
array([-88.57216851, -86.72253095, -84.86197029, -82.99894164,
       -81.13497684, -79.27055903, -77.40588808, -75.54106145,
       -73.67613231, -71.81113211, -69.94608065, -68.08099099,
       -66.21587211, -64.35073041, -62.48557052, -60.62039593,
       -58.75520927, -56.8900126 , -55.02480754, -53.15959537,
       -51.29437714, -49.4291537 , -47.56392575, -45.69869388,
       -43.83345858, -41.96822027, -40.1029793 , -38.23773599,
       -36.37249059, -34.50724334, -32.64199444, -30.77674406,
       -28.91149237, -27.0462395 , -25.18098558, -23.31573073,
       -21.45047504, -19.58521861, -17.71996153, -15.85470387,
       -13.98944571, -12.12418712, -10.25892817,  -8.39366891,
        -6.5284094 ,  -4.66314971,  -2.79788988,  -0.93262997,
         0.93262997,   2.79788988,   4.66314971,   6.5284094 ,
         8.39366891,  10.25892817,  12.12418712,  13.98944571,
        15.85470387,  17.71996153,  19.58521861,  21.45047504,
        23.31573073,  25.18098558,  27.0462395 ,  28.91149237,
        30.77674406,  32.64199444,  34.50724334,  36.37249059,
        38.23773599,  40.1029793 ,  41.96822027,  43.83345858,
        45.69869388,  47.56392575,  49.4291537 ,  51.29437714,
        53.15959537,  55.02480754,  56.8900126 ,  58.75520927,
        60.62039593,  62.48557052,  64.35073041,  66.21587211,
        68.08099099,  69.94608065,  71.81113211,  73.67613231,
        75.54106145,  77.40588808,  79.27055903,  81.13497684,
        82.99894164,  84.86197029,  86.72253095,  88.57216851])

We convert dask arrays into numpy arrays by loading them.

Loading#

The follwing functions of the dask array:

  • .load(), .compute()

  • .persist()

bring data of the dask array into physical memory. Data is not lazy anymore and fills the memory space.

We therefore need to be careful, what we actually load, because we otherwise get a memory overflow. We should only compute data if we really need it in the working environment.

[8]:
ds[varname][0, 0, 0, 0, 0].load().data
[8]:
array(245.68579102)

Important

This does not change the python object in place.

[9]:
ds[varname][0, 0, 0, 0, 0].data
[9]:
Array Chunk
Bytes 4 B 4 B
Shape () ()
Dask graph 1 chunks in 3 graph layers
Data type float32 numpy.ndarray

When to NOT use dask#

Random access i.e. fine subsetting

Opening data with Xarray can be lazy without dask. If the data of interest fits into memory, you do not necessarily need dask. You can open the dataset with chunks=None and get a lazy representation based on xarray. Note that you are working with numpy arrays in the following.

[10]:
%%capture
varname = "ta"
dsxarraylazy = xr.open_zarr(
    f"reference:://work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/CFday_{varname}_combined.parq",
    storage_options=dict(
        lazy=True,
        remote_protocol="file",
    ),
    consolidated=False,
    chunks=None,
)
[11]:
dsxarraylazy
[11]:
<xarray.Dataset>
Dimensions:          (ensemble_member: 51, time: 60265, lev: 47, bnds: 2,
                      lat: 96, lon: 192)
Coordinates:
  * ensemble_member  (ensemble_member) object 'r10i1p1f1' ... 'r9i1p1f1'
  * lat              (lat) float64 -88.57 -86.72 -84.86 ... 84.86 86.72 88.57
  * lev              (lev) float64 0.9961 0.9826 0.959 ... 4.225e-05 9.816e-06
  * lon              (lon) float64 0.0 1.875 3.75 5.625 ... 354.4 356.2 358.1
  * time             (time) datetime64[ns] 1850-01-01T12:00:00 ... 2014-12-31...
Dimensions without coordinates: bnds
Data variables:
    ap               (ensemble_member, time, lev) float64 ...
    ap_bnds          (time, lev, bnds) float64 ...
    b                (ensemble_member, time, lev) float64 ...
    b_bnds           (time, lev, bnds) float64 ...
    lat_bnds         (time, lat, bnds) float64 ...
    lev_bnds         (time, lev, bnds) float64 ...
    lon_bnds         (time, lon, bnds) float64 ...
    ps               (ensemble_member, time, lat, lon) float32 ...
    ta               (ensemble_member, time, lev, lat, lon) float32 ...
    time_bnds        (time, bnds) datetime64[ns] ...
Attributes: (12/47)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   0.0
    branch_time_in_parent:  0.0
    cmor_version:           3.5.0
    ...                     ...
    table_id:               CFday
    table_info:             Creation Date:(09 May 2019) MD5:e6ef8ececc8f33864...
    title:                  MPI-ESM1-2-LR output prepared for CMIP6
    tracking_id:            hdl:21.14100/58c496bc-8e9d-47cb-bfd7-9b28d399e816
    variable_id:            ta
    variant_label:          r1i1p1f1
[12]:
dsxarraylazy[varname].isel(time=0, ensemble_member=0, lev=0).data
[12]:
array([[245.68579, 245.64478, 245.60571, ..., 245.79517, 245.76001,
        245.7229 ],
       [247.55103, 247.5315 , 247.51782, ..., 247.6565 , 247.61157,
        247.57642],
       [248.30298, 248.41821, 248.54517, ..., 248.0979 , 248.13696,
        248.20728],
       ...,
       [241.01587, 241.40063, 241.7893 , ..., 239.9397 , 240.27954,
        240.63892],
       [242.66235, 242.79321, 242.92993, ..., 242.30688, 242.41626,
        242.5354 ],
       [245.23657, 245.23462, 245.23462, ..., 245.2522 , 245.24634,
        245.24048]], dtype=float32)

Task Graph#

All xarray functions that can be interpreted lazily by dask will be added to dask´s task graph.

This task graph is only evaluated if needed.

[13]:
ds[varname].data.dask
[13]:

HighLevelGraph

HighLevelGraph with 2 layers and 3073516 keys from all layers.

Layer1: original

original-open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712

layer_type MaterializedLayer
is_materialized True
number of outputs 1

Layer2: open_dataset

open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712

layer_type Blockwise
is_materialized False
number of outputs 3073515
shape (51, 60265, 47, 96, 192)
dtype float32
chunksize (1, 1, 47, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on original-open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
60265 51 192 96 47
[14]:
# 1. Only interested in first level
ds_firstlevel = ds[[varname]].isel(lev=0)
ds_firstlevel[varname].data.dask
[14]:

HighLevelGraph

HighLevelGraph with 3 layers and 6147031 keys from all layers.

Layer1: original

original-open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712

layer_type MaterializedLayer
is_materialized True
number of outputs 1

Layer2: open_dataset

open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712

layer_type Blockwise
is_materialized False
number of outputs 3073515
shape (51, 60265, 47, 96, 192)
dtype float32
chunksize (1, 1, 47, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on original-open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
60265 51 192 96 47

Layer3: getitem

getitem-b152f8d0f6e17c47d63d2b4f78a1252d

layer_type MaterializedLayer
is_materialized True
number of outputs 3073515
shape (51, 60265, 96, 192)
dtype float32
chunksize (1, 1, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on open_dataset-a5f19959a3cd97754eb92c66f554a622ta-8bebb83a4a443c538a9374498cddf712
51 1 192 96 60265

For large data sets like this one, the task graph is huge because of the O(Million) chunks in the Array. So while we are creating our workflow within this task graph, we have to think about how we can do that in the optimal manner.

Optimized opening#

We can already open the dataset with chunks that fit to our analysis purpose. As we assume that we are

  • are only interested in one level

  • want to generate quantiles over the entire ensemble_member

, we can set the following chunks in the open_ function of xarray:

[15]:
%%capture
varname = "ta"
ds = xr.open_zarr(
    f"reference:://work/ik1017/CMIP6/meta/intake/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/CFday_{varname}_combined.parq",
    storage_options=dict(
        lazy=True,
        remote_protocol="file",
    ),
    consolidated=False,
    chunks=dict(ensemble_member=51, lev=1),
)
[16]:
# 1. Only interested in first level
ds_firstlevel = ds[[varname]].isel(lev=0)
ds_firstlevel[varname].data.dask
[16]:

HighLevelGraph

HighLevelGraph with 3 layers and 2892721 keys from all layers.

Layer1: original

original-open_dataset-746045b6ab62b368add198c45660e89cta-350432e22e272d100821c09776798208

layer_type MaterializedLayer
is_materialized True
number of outputs 1

Layer2: open_dataset

open_dataset-746045b6ab62b368add198c45660e89cta-350432e22e272d100821c09776798208

layer_type Blockwise
is_materialized False
number of outputs 2832455
shape (51, 60265, 47, 96, 192)
dtype float32
chunksize (51, 1, 1, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on original-open_dataset-746045b6ab62b368add198c45660e89cta-350432e22e272d100821c09776798208
60265 51 192 96 47

Layer3: getitem

getitem-8bb71842dd54b0d0aac0e83dff0ccd61

layer_type MaterializedLayer
is_materialized True
number of outputs 60265
shape (51, 60265, 96, 192)
dtype float32
chunksize (51, 1, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on open_dataset-746045b6ab62b368add198c45660e89cta-350432e22e272d100821c09776798208
51 1 192 96 60265

We end up with a third of the toal number of keys of the first approach.

We can further use the optimize function of dask which applies different optimization routines on the task graph:

[17]:
ds_firstlevel_optimized = dask.optimize(ds_firstlevel)
ds_firstlevel_optimized = ds_firstlevel_optimized[0]
ds_firstlevel_optimized[varname].data.dask
[17]:

HighLevelGraph

HighLevelGraph with 1 layers and 120531 keys from all layers.

Layer1: getitem

getitem-8bb71842dd54b0d0aac0e83dff0ccd61

layer_type MaterializedLayer
is_materialized True
number of outputs 120531
shape (51, 60265, 96, 192)
dtype float32
chunksize (51, 1, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
51 1 192 96 60265

Which functions are evaluated lazily?#

Many of the numpy functions:

[18]:
daskmethods = dir(ds_firstlevel_optimized[varname].data)
numpymethods = dir(ds_firstlevel_optimized["lat"].data)
print([a for a in numpymethods if a in daskmethods])
['T', '__abs__', '__add__', '__and__', '__array__', '__array_function__', '__array_priority__', '__array_ufunc__', '__bool__', '__class__', '__complex__', '__deepcopy__', '__delattr__', '__dir__', '__divmod__', '__doc__', '__eq__', '__float__', '__floordiv__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__index__', '__init__', '__init_subclass__', '__int__', '__invert__', '__iter__', '__le__', '__len__', '__lshift__', '__lt__', '__matmul__', '__mod__', '__mul__', '__ne__', '__neg__', '__new__', '__or__', '__pos__', '__pow__', '__radd__', '__rand__', '__rdivmod__', '__reduce__', '__reduce_ex__', '__repr__', '__rfloordiv__', '__rlshift__', '__rmatmul__', '__rmod__', '__rmul__', '__ror__', '__rpow__', '__rrshift__', '__rshift__', '__rsub__', '__rtruediv__', '__rxor__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__sub__', '__subclasshook__', '__truediv__', '__xor__', 'all', 'any', 'argmax', 'argmin', 'astype', 'choose', 'clip', 'conj', 'copy', 'cumprod', 'cumsum', 'dot', 'dtype', 'flatten', 'imag', 'itemsize', 'max', 'mean', 'min', 'nbytes', 'ndim', 'nonzero', 'prod', 'ravel', 'real', 'repeat', 'reshape', 'round', 'shape', 'size', 'squeeze', 'std', 'sum', 'swapaxes', 'trace', 'transpose', 'var', 'view']

or not evaluated lazily?

[19]:
print([a for a in numpymethods if not a in daskmethods])
['__array_finalize__', '__array_interface__', '__array_prepare__', '__array_struct__', '__array_wrap__', '__class_getitem__', '__contains__', '__copy__', '__delitem__', '__dlpack__', '__dlpack_device__', '__iadd__', '__iand__', '__ifloordiv__', '__ilshift__', '__imatmul__', '__imod__', '__imul__', '__ior__', '__ipow__', '__irshift__', '__isub__', '__itruediv__', '__ixor__', '__setstate__', 'argpartition', 'argsort', 'base', 'byteswap', 'compress', 'conjugate', 'ctypes', 'data', 'diagonal', 'dump', 'dumps', 'fill', 'flags', 'flat', 'getfield', 'item', 'itemset', 'newbyteorder', 'partition', 'ptp', 'put', 'resize', 'searchsorted', 'setfield', 'setflags', 'sort', 'strides', 'take', 'tobytes', 'tofile', 'tolist', 'tostring']
[20]:
# 2.Calculate a field mean
ds_fieldmean = ds_firstlevel_optimized[[varname]].mean(dim=["lat", "lon"])
ds_fieldmean[varname].data.dask
[20]:

HighLevelGraph

HighLevelGraph with 3 layers and 241061 keys from all layers.

Layer1: getitem

getitem-8bb71842dd54b0d0aac0e83dff0ccd61

layer_type MaterializedLayer
is_materialized True
number of outputs 120531
shape (51, 60265, 96, 192)
dtype float32
chunksize (51, 1, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
51 1 192 96 60265

Layer2: mean_chunk

mean_chunk-38f0dfc3230035e469b89cc8175f5473

layer_type Blockwise
is_materialized False
number of outputs 60265
shape (51, 60265, 96, 192)
dtype float32
chunksize (51, 1, 96, 192)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on getitem-8bb71842dd54b0d0aac0e83dff0ccd61
51 1 192 96 60265

Layer3: mean_agg-aggregate

mean_agg-aggregate-036593c2734e4aa9fbee4773a240d2d9

layer_type MaterializedLayer
is_materialized True
number of outputs 60265
shape (51, 60265)
dtype float32
chunksize (51, 1)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on mean_chunk-38f0dfc3230035e469b89cc8175f5473
60265 51
[21]:
ds_fieldmean[varname].data
[21]:
Array Chunk
Bytes 11.72 MiB 204 B
Shape (51, 60265) (51, 1)
Dask graph 60265 chunks in 3 graph layers
Data type float32 numpy.ndarray
60265 51

Xarray’s lazy functions:#

E.g. groupby operation is lazy:

[22]:
# 3. Yearly means:
ds_fieldmean_ym = ds_fieldmean.groupby("time.year").mean()
ds_fieldmean_ym
[22]:
<xarray.Dataset>
Dimensions:          (year: 165, ensemble_member: 51)
Coordinates:
  * ensemble_member  (ensemble_member) object 'r10i1p1f1' ... 'r9i1p1f1'
    lev              float64 0.9961
  * year             (year) int64 1850 1851 1852 1853 ... 2011 2012 2013 2014
Data variables:
    ta               (year, ensemble_member) float32 dask.array<chunksize=(1, 51), meta=np.ndarray>

We now got

Dask graph     165 chunks in 1324 graph layers

so that displaying the underlying task graph would end in troubles so we better not do it.

Make custom numpy operations lazy#

If we have a package or function that works only on numpy arrays, we can use apply_ufunc or map_blocks to apply it on the dask array lazily. This is rather easy if the shape of the array does not change. If a reduction takes place, it becomes more and more complex and more kwargs has to be provided.

One simple example would be lossy compression. We can lossy compress all chunks with numcodecs by using only 12 mantissa bits:

[23]:
def compress_data(partds):
    import numcodecs

    rounding = numcodecs.BitRound(keepbits=12)
    return rounding.decode(rounding.encode(partds))
[24]:
ds_fieldmean_ym_lossy = xr.apply_ufunc(
    compress_data, ds_fieldmean_ym, dask="parallelized"
)

We now continue with creating quantiles. We could do that in one step however we would like to save each quantile as a new data array in the data set so we use a for loop:

[25]:
# 3. Quantiles:
quantiles = [0.01, 0.25, 0.5, 0.75, 0.99]
quantiles_str = [str(a) for a in quantiles]
for sq, q in zip(quantiles_str, quantiles):
    ds_fieldmean_ym[sq] = ds_fieldmean_ym[varname].quantile(q, dim="ensemble_member")
quantiles = ds_fieldmean_ym[quantiles_str]

Rechunking#

In general, you should already open the data with the chunk setting you need for your analysis.

Some functions like .quantile require that their core dimension is exactly one chunk of data. If you are in that situation, you can rechunk the data with .chunk to what you need.

Only a xarray dataset configured as follows can be used for .quantile:

[26]:
ds_fieldmean_ym_rechunked = ds_fieldmean_ym.chunk(ensemble_member=51)

Schedule#

We finally reached the end point of our workflow. We can see that there are still a lot of task layers. This is probably because the years are calculated after each other.

We can check if dask.optimize can fuse these tasks.

[27]:
quantiles_optimized = dask.optimize(quantiles)
quantiles_optimized = quantiles_optimized[0]
quantiles_optimized
[27]:
<xarray.Dataset>
Dimensions:   (year: 165)
Coordinates:
    lev       float64 0.9961
  * year      (year) int64 1850 1851 1852 1853 1854 ... 2010 2011 2012 2013 2014
    quantile  float64 0.01
Data variables:
    0.01      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.25      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.5       (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.75      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.99      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>

To test, whether our workflow can be computed in a sufficient amount of time, we can compute a small subset like a year.

Note that there is also the ProgressBar which gives us updates on the status of the calculations:

[28]:
quantiles_test = quantiles_optimized.isel(year=slice(0, 2))
quantiles_test
[28]:
<xarray.Dataset>
Dimensions:   (year: 2)
Coordinates:
    lev       float64 0.9961
  * year      (year) int64 1850 1851
    quantile  float64 0.01
Data variables:
    0.01      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.25      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.5       (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.75      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
    0.99      (year) float64 dask.array<chunksize=(1,), meta=np.ndarray>
[39]:
with ProgressBar():
    quantiles_test = quantiles_test.compute()
[########################################] | 100% Completed | 115.12 s

We can see that our two years take 100s to be finalized. Extrapolation gives 5000s / 60s = 100min for the entire array.

At this stage, we could set-up a cluster with a client (see distributed) to submit the work to a distributed system.

[42]:
client = Client()
client
[42]:

Client

Client-9a26239b-d571-11ee-b086-080038c049a7

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: /proxy/8787/status

Cluster Info

[ ]:
quantiles_optimized = quantiles_optimized.compute()

Plot#

To plot the quantiles, we use hvplot.xarray and the following combinations of area and line plots:

[40]:
def plot_quantiles_area(quantiles, ylabel):
    ninetyeight = quantiles.hvplot.area(
        grid=True,
        y="0.01",
        y2="0.99",
        width=820,
        color="aliceblue",
        label="98% of all values",
    )
    fifty = quantiles.hvplot.area(
        grid=True,
        y="0.25",
        y2="0.75",
        width=820,
        color="aqua",
        label="50% of all values",
    )
    median = quantiles.hvplot.line(
        grid=True, y="0.5", color="cornflowerblue", label="median"
    )
    comb = (ninetyeight * fifty * median).opts(
        ylabel=ylabel, legend_position="bottom_right"
    )
    return comb
[44]:
plot_quantiles_area(quantiles_optimized, varname)
[44]:
[ ]: