--- file_format: mystnb kernelspec: language: python name: python3 display_name: Python 3 execution: timeout: 120 --- # The km-scale cloud by DKRZ The [km-scale cloud](https://km-scale-cloud.dkrz.de) formerly known as EERIE cloud addresses data users who cannot work on DKRZ´s High Performance Computer Levante next to the data. It provides open, efficient, Zarr-based access to High-resolution ESM datasets produced by * projects like EERIE and NextGEMs * models like ICON and IFS-FESOM. +++ Whether originally stored as Zarr, NetCDF, or GRIB, the km-scale cloud delivers - without rewriting, relocating, or converting - byte ranges of the original files as Zarr chunks. In addition, it incorporates Dask for on-demand, server-side data reduction tasks — such as compression and the computation of summary statistics. This combination makes it practical and scalable to work with petabyte-scale climate data even to users with limited compute resources. Integrated support for catalog tools like STAC (SpatioTemporal Asset Catalog) and intake, organized in a semantic hierarchy, allows users to discover, query, and subset data efficiently through standardized, interoperable interfaces. This is enabled by the python package [cloudify](https://gitlab.dkrz.de/data-infrastructure-services/cloudify) described in detail in [Wachsmann, 2025](https://doi.org/10.35095/WDCC/Overview_Cloudify). +++ ## Show case: Time series quickplot for a ICON simulation ```{code-cell} ipython3 import xarray as xr dataset_id="icon-esm-er.highres-future-ssp245.v20240618.atmos.native.mon" ds=xr.open_dataset( f"simplecache::https://km-scale-cloud.dkrz.de/datasets/{dataset_id}/kerchunk", engine="zarr", chunks="auto", zarr_format=2, # if you use zarr v3: # storage_options=dict( # simplecache=dict(asynchronous=True), # https=dict(asynchronous=True) # ) ) ``` ```{code-cell} ipython3 import matplotlib.pyplot as plt var="tas_gmean" vmean = ds[var].squeeze() vmean_yr = vmean.resample(time="YE").mean() vmean.plot() vmean_yr.plot() plt.title(" ".join(dataset_id.split(".")) + f"\ntime series of monthly and yearly mean {var}") plt.xlabel("Year") ``` ## Km-scale datasets and how to approach them The km-scale-cloud provides many *endpoints* (URIs) with different features. Under [this link](https://km-scale-cloud.dkrz.de/docs), all available endpoints are documented. ### Helpful dataset endpoints A key endpoint is the `/datasets` endpoint which provides a simple *list of datasets* accessible through the km-scale-cloud. We can use it like: ```{code-cell} ipython3 import requests kmscale_uri = "https://km-scale-cloud.dkrz.de" kmscale_datasets_uri=kmscale_uri+"/datasets" kmscale_datasets=requests.get(kmscale_datasets_uri).json() print(f"The km-scale-cloud provides {len(kmscale_datasets)} datasests such as:") print(kmscale_datasets[0]) ``` For each dataset, a *Xarray-dataset view* endpoint exists which can be helpful to get an overview about the content of the dataset. The endpoint is constructed like `datasets/dataset_id/`, e.g.: ```{code-cell} ipython3 dataset_pattern="atmos.native.mon" global_atm_mon_mean_ds_name=next( a for a in kmscale_datasets if dataset_pattern in a ) print( "An available atmospheric global mean monthly mean dataset is: "+ global_atm_mon_mean_ds_name ) global_atm_mon_mean_ds_uri=kmscale_datasets_uri+"/"+global_atm_mon_mean_ds_name ``` The key endpoint is the default Zarr-endpoint constructed like `datasets/DATASET_ID/kerchunk`. Such an address can be used by any tool that can read Zarr over http. E.g. With Xarray: ```{code-cell} ipython3 xr.open_dataset( global_atm_mon_mean_ds_uri+"/kerchunk", engine="zarr" ) ``` ### The Datatree: All in one Zarr-group +++ **Note: xarray>=2025.7.1 required and evolving fast**. +++ Opening Zarr and representing it in Xarray does not coast much resources. It is fast and small in memory. A strategy to work with multiple datasets is therefore to just **open everything**. We can access to the full >10PB of the km-scale-cloud with only using `xarray` by applying the following code: ```{code-cell} ipython3 dt = xr.open_datatree( "simplecache::https://km-scale-cloud.dkrz.de/datasets", engine="zarr", zarr_format=2, chunks=None, create_default_indexes=False, decode_cf=False, ) ``` The `dt` object is a [Xarray Datatree](https://docs.xarray.dev/en/latest/user-guide/data-structures.html#datatree). This datatree object allows you to browse and discover the full content of the km-scale-cloud similar to the functionality of [intake](https://intake.readthedocs.io/en/latest/) but without requiring an additional tool. The workflow using the datatree involves the following steps: 1. `filter`: subset the tree so that it only includes the datasets you want. You can apply any customized function using the datasets as input. In the following example, we look for a specific dataset name. 2. `chunk`, `decode_cf`: in case you need coordinate values for subsetting or in case you are interested in more than one chunk, better use these functions to allow lazy access on coordinates. 3. Instead of writing loops over a list or a dictionary of datasets, you can now use the `.map_over_datasets` function to apply a function to *all* datasets at once. ```{code-cell} ipython3 path_filter="s2024-08-10" filtered_tree=dt.filter(lambda ds: ds if path_filter in ds.path else None) display(filtered_tree) ``` ```{code-cell} ipython3 filtered_tree_chunked=filtered_tree.chunk() filtered_tree_chunked_decoded=filtered_tree_chunked.map_over_datasets( lambda ds: xr.decode_cf(ds) if "time" in ds else None ) ``` Any `.compute()` or `.load()` will trigger data retrieval. Thus, make sure you **first subset** before you download. The `.nbytes` object shows you how much *uncompressed* data will be loaded to your memory. ```{code-cell} ipython3 filtered_tree_chunked_decoded.nbytes/1024**2 ``` ```{code-cell} ipython3 time_sel="2024-08-10" var="tas" filtered_tree_chunked_decoded_subsetted=filtered_tree_chunked_decoded.map_over_datasets( lambda ds: ds[[var]].sel(time=time_sel) if all(a in ds for a in [var,"time"]) else None ).prune() print(filtered_tree_chunked_decoded_subsetted.nbytes/1024**2) ``` Before running `.load()`, add all lazy functions to the dask´s taskgraph: ```{code-cell} ipython3 dt_workflow=filtered_tree_chunked_decoded_subsetted.map_over_datasets(lambda ds: ds.mean(dim="time") if "time" in ds else None) ``` ```{code-cell} ipython3 %%time ds_mean=dt_workflow.compute().leaves[0].to_dataset() ds_mean ``` ## STAC interface The most interoperable *catalog* interface for the km-scale-cloud is with [STAC](https://stacspec.org/en) (SpatioTemporal Asset Catalogs). You can use any [stac-browser implementation](https://radiantearth.github.io/stac-browser/#/) to [browse and discover](https://discover.dkrz.de/external/km-scale-cloud.dkrz.de/stac-collection-all.json) the dynamically created [km-scale cloud collection](https://km-scale-cloud.dkrz.de/stac-collection-all.json) with all datasets linked as dynamic items. For programmatic access, follow this notebook. ```{code-cell} ipython3 import pystac kmscale_collection_uri="https://km-scale-cloud.dkrz.de/stac-collection-all.json" kmscale_collection=pystac.Collection.from_file(kmscale_collection_uri) kmscale_collection ``` Each *child* link in the collection corresponds to a dataset and is formatted as a STAC item. Datasets of the km-scale-cloud are highly aggregated. We can do: ```{code-cell} ipython3 dataset_items=[ pystac.Item.from_file(link.href) for link in kmscale_collection.get_links("child") ] item=dataset_items[0] item ``` We can use properties or ids to browse and search the items: ```{code-cell} ipython3 selection=[ a for a in dataset_items if all(b in a.id.lower() for b in ["icon","gr025","monthly","2d","hist","atmos"]) ] item=selection[0] len(selection) ``` Each dataset **item** has multiple assets corresponding to different ways how to open the dataset, e.g. with *"dkrz-disk"* should be used when working on levante. We now use the "eerie-cloud" asset. It provides information on volume and number of variables for the uncompressed, total dataset. ```{code-cell} ipython3 asset="eerie-cloud" ec_asset=item.get_assets()[asset] ec_asset_dict=ec_asset.to_dict() ec_asset ``` We can open datasets with the asset: ```{code-cell} ipython3 import xarray as xr zarr_uri=ec_asset_dict["href"] xr.open_dataset( zarr_uri, **ec_asset_dict["xarray:open_kwargs"], storage_options=ec_asset_dict.get("xarray:storage_options") ) ``` ## Advanced: Retrievel setting Before we start *excessively* transfering data chunks, we need to understand how the km-scale cloud manages to prevent us from running into rejected requests. The km-scale-cloud aims to serve data to multiple users at once. Therefore, it allows 200 requests per client IP before it burst new requests and returns a 429 error. It also only allows 60 requests per datasets. On the other side, the python Zarr that we use for access is embarassingly parallel. Under the hood, it uses the aiohttp client which limits the number of simultanoues request to 100 per session. Thus, with default settings, you can only retrieve data from **2 datasets** (2 * 100) at once! Moreover, it make sense to configure the client so that it uses less requests at once. A reasonable number of concurrent requests depends on - your network bandwidth - the chunk size of the target data For some high resolution data (GRIB formatted), one chunk can be *large* (>>10MB). In that case, you already reach a performance that fill your network capacity with only a few parallel chunks. This is how parallel requests are limited to `2`: ```python3 import aiohttp so_limited={ "client_kwargs": { "connector": aiohttp.TCPConnector(limit=2), } } xr.open_dataset( zarr_uri, **ec_asset_dict["xarray:open_kwargs"], storage_options=so_limited ) ``` ## Advanced: Two Zarr versions The km-scale-cloud provides **two** Zar access endpoints for the same datasets which differ: - **/kerchunk** suffix: The *preferred* href delivers data as stored in the original without any processing on the server. Original zarr data, e.g. NextGEMs ICON, is just passed through to the user. - **/zarr** suffix: The *alternate* href delivers data processed on server-side with *dask*. The resulting data is - rechunked in large, dask-optimal chunksizes (O(100MB) in memory) - uniformly compressed with *blosc*. Native data is lossy compressed with 12-bit bitrouding. *Kerchunk* have a smaller footprint in both memory and compute on the server. *zarr* reduces data volume before transfer to support users with limited network bandwidth. +++ ## Intake ```{warning} The Intake catalog is a legacy interface and requires `intake<2` ``` +++ The server dynamically creates an intake catalog which is *synced* with the available dataset. ```{code-cell} ipython3 import intake cat = intake.open_catalog("https://eerie.cloud.dkrz.de/intake.yaml") ``` We can list all datasets available in the catalog with `list`: ```{code-cell} ipython3 all_dkrz_datasets = list(cat) len(all_dkrz_datasets) ``` The two zarr access points for each dataset are given in the description: ```{code-cell} ipython3 dsname = all_dkrz_datasets[-1] cat[dsname].describe() ``` `to_dask()` opens the dataset: ```{code-cell} ipython3 cat[dsname].to_dask() ``` ## Environment To succesfully run this notebook, you need a kernel / environment with the following packages installed: ``` - xarray - pystac - zarr - dask - hvplot # for plotting - aiohttp - requests - intake-xarray - intake<2 - eccodes # for grib data - python-eccodes # for grib data ```