Retrieval from DKRZ tape archive#

This notebook explains how to efficiently retrieve data from archive with intake version 1 and slkspec on the example of EERIE data. The notebook works well within the /work/bm1344/conda-envs/py_312/ environment.

Advantages of intake retrievals compared to other options like command line slk retrievals.

  • Opening the data from the catalog, do coordinate look-ups and prepare workflows is always possible without retrieval and for free.

  • Same convenient and familiar ds.load() command to get data - retrievals are included.

  • Optimized background retrieval management, e.g. by tape grouping

  • “Recalls” can be asyncronously submitted for later resuming of work. Just redo the script, it can only become faster.

  • Catalog configurations include a specification of a shared and mostly quota free Levante scratch cache for tape retrievals to prevent duplicates on disk and therefore quota issues

Workflow and speed#

Subsetting#

When you open a dataset from the intake archive catalog that we introduce here, you can still browse and subset by coordinates as well as prepare dask workflows just as you know from other catlaogs. As soon as dask starts to actually run data tasks, e.g. if you call .compute() or .load(), the retrieval workflow will be triggered. Therefore, it is of particular importance for these archive catalogs that you first subset the data before you do a compute. Otherwise, you will submit retrievals of too much data which will not only take forever but also may break the underlying tape system.

Data flow and speed#

The data flow from tape to your computer’s memory enters different stages:

  1. tape->tapecache: This transfer is called recall. It takes about O(30min) per tape with large fluctuations depending on the archive load. 3 tape recalls in parallel are allowed per user. Each EERIE dataset is usually distributed across O(5) tapes.

  2. tapecache->levante: This transfer is called retrieve. It takes about O(1min).

  3. levante->memory: The final load.

For experts, we collect more info in this pad.

Example: EERIE data in archive#

A key benefit of catalogs is that you do not need to know where and how the data is stored. For completion and because we can use the information to optimize our workflow, we explain how EERIE Datasets in the archive are organized depending on their sizes:

  • Small: five years per file if five years of the full dataset are < 1GB

  • Medium: one year per one file if one year is <100GB and if “time” dimension exists

  • Large: one year of one variable per file if one year of one variable is <100GB

  • XL: on month of one year of one variable per file in other cases

The root directories for the output of the German ESM contributions are:

  • ICON-ESM-ER: /arch/bm1344/ICON/outdata/

  • IFS-FESOM2-SR: /arch/bm1344/IFS-FESOM2/outdata/

Small retrievals#

If you aim to retrieve data volumes O(<=10GB), you can forget how EERIE data is archived and just work with the datasets as if they are on disk and do a .load() on your subset. You will have to wait for about an hour untill the command finishes and until you can work with the data.

import intake

catalog = (
    "https://raw.githubusercontent.com/eerie-project/intake_catalogues/main/dkrz/archive/main.yaml"
)
#catalog="/work/bm1344/DKRZ/intake_catalogues/dkrz/archive/main.yaml"
eerie_cat = intake.open_catalog(catalog)
eerie_cat
main:
  args:
    path: https://raw.githubusercontent.com/eerie-project/intake_catalogues/main/dkrz/archive/main.yaml
  description: ''
  driver: intake.catalog.local.YAMLFileCatalog
  metadata: {}
Hide code cell source
def find_data_sources(catalog,name=None):
    newname='.'.join(
        [ a 
         for a in [name, catalog.name]
         if a
        ]
    )
    data_sources = []

    for key, entry in catalog.items():
        if isinstance(entry, intake.catalog.Catalog):
            if newname == "main":
                newname = None
            # If the entry is a subcatalog, recursively search it
            data_sources.extend(find_data_sources(entry, newname))
        elif isinstance(entry, intake.source.base.DataSource):
            if key.endswith('.nc'):
                continue
            if newname:
                data_sources.append(newname+"."+key)
            else:
                data_sources.append(key)

    return data_sources
all_sources=find_data_sources(eerie_cat)
all_sources
['model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_1h_inst',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_1h_mean',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_3h_inst',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_6h_inst',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_6h_mean',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_daily_max',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_daily_mean',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.2d_daily_min',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.model-level_daily_mean_1',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.model-level_daily_mean_2',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.atmos.native.pl_6h_inst',
 'model-output.icon-esm-er.eerie-spinup-1950.v20240618.ocean.native.model-level_daily_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_1h_inst',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_1h_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_3h_inst',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_6h_inst',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_6h_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_daily_max',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_daily_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_daily_min',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_monthly_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.atmos_native_mon',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.model-level_daily_mean_1',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.model-level_daily_mean_2',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.model-level_monthly_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.pl_6h_inst',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.2d_daily_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.2d_daily_square',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.2d_monthly_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.2d_monthly_square',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.5lev_daily_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.eddy_monthly_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.model-level_daily_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.model-level_monthly_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.ocean.native.ocean_native_mon',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.land.native.2d_daily_mean',
 'model-output.icon-esm-er.eerie-control-1950.v20240618.land.native.2d_monthly_mean']
dscat = eerie_cat["model-output.icon-esm-er.eerie-control-1950.v20240618.atmos.native.2d_daily_mean"](chunks={})
dscat
2d_daily_mean:
  args:
    chunks: {}
    consolidated: false
    storage_options:
      lazy: true
      remote_options:
        slk_cache: /scratch/k/k202134/INTAKE_CACHE
      remote_protocol: slk
    urlpath: reference:://work/bm1344/DKRZ/kerchunks_pp_batched/ICON/eerie-control-1950/v20240618/atmos_native_2d_daily_mean_slk.parq
  description: ''
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/eerie-project/intake_catalogues/main/dkrz/archive/model-output/icon-esm-er/eerie-control-1950/v20240618/atmos/native

The slk_cache notates the location on levante where the data is retrieved to. Per intake config, data is stored in a shared, additional scratch cache. Data untouched for two weeks will automatically be deleted from that location.

Make sure you have write-permissions to the shared cache directory. If not, you can provide another location via remote_options similar as we did for chunks in the above cell.

shared_cache=dscat.describe()["args"]["storage_options"]["remote_options"]["slk_cache"]
shared_cache
'/scratch/k/k202134/INTAKE_CACHE'
ds=dscat.to_dask()
ds
/work/bm1344/conda-envs/py_312/lib/python3.12/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  'dims': dict(self._ds.dims),
<xarray.Dataset> Size: 8TB
Dimensions:             (ncells: 5242880, time: 18262, height: 1, height_2: 1,
                         height_3: 1)
Coordinates:
  * height              (height) float64 8B 2.0
  * height_2            (height_2) float64 8B 10.0
  * height_3            (height_3) float64 8B 90.0
    lat                 (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
    lon                 (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
  * time                (time) datetime64[ns] 146kB 1991-01-01T23:59:59 ... 2...
Dimensions without coordinates: ncells
Data variables: (12/22)
    cell_sea_land_mask  (ncells) int32 21MB dask.array<chunksize=(5242880,), meta=np.ndarray>
    clt                 (time, ncells) float32 383GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    evspsbl             (time, ncells) float32 383GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    hfls                (time, ncells) float32 383GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    hfss                (time, ncells) float32 383GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    hur                 (time, height_3, ncells) float32 383GB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>
    ...                  ...
    rsus                (time, ncells) float32 383GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    sfcwind             (time, height_2, ncells) float32 383GB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>
    tas                 (time, height, ncells) float32 383GB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>
    ts                  (time, ncells) float32 383GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    uas                 (time, height_2, ncells) float32 383GB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>
    vas                 (time, height_2, ncells) float32 383GB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>

Large retrievals#

We can consider retrievals of volumes > 10GB as “large”. We will use the information on how the data is stored to optimize our retrieval. We first select

  • variables

  • months

to subset accurately on file level granularity.

varsel=["clt","hur"]
timesel=slice("1991","1992")
subset=ds[varsel].sel(time=timesel)
subset
<xarray.Dataset> Size: 31GB
Dimensions:   (time: 731, ncells: 5242880, height_3: 1)
Coordinates:
  * height_3  (height_3) float64 8B 90.0
    lat       (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
    lon       (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
  * time      (time) datetime64[ns] 6kB 1991-01-01T23:59:59 ... 1992-12-31T23...
Dimensions without coordinates: ncells
Data variables:
    clt       (time, ncells) float32 15GB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    hur       (time, height_3, ncells) float32 15GB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>
print("Uncompressed size: "+str(subset.nbytes/1024**3)+ " GB")
Uncompressed size: 28.632817953824997 GB

Recalls#

If you do a retrieval for the first time, it is likely that the data is ony on tape. In that case, you better create a virtual for_recall dataset that contains especially one chunk per file (month and variable) for efficient recalls. For that, we will only use sel (resample does not work with dask) and optimize the dataset with dask.

#Asummes that there is always a time stamp at the end of the month (true for all EERIE data)
monthends=subset.sel(time=subset.time.dt.is_month_end)
#Assumes that there is always a time stamp at 23hour (true for most EERIE data)
for_recall=monthends.where(
    ( monthends.time.dt.hour==23 ),
    drop=True
)
for_recall
<xarray.Dataset> Size: 1GB
Dimensions:   (time: 24, ncells: 5242880, height_3: 1)
Coordinates:
  * height_3  (height_3) float64 8B 90.0
    lat       (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
    lon       (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
  * time      (time) datetime64[ns] 192B 1991-01-31T23:59:59 ... 1992-12-31T2...
Dimensions without coordinates: ncells
Data variables:
    clt       (time, ncells) float32 503MB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    hur       (time, height_3, ncells) float32 503MB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>
import dask
for_recall=dask.optimize(for_recall)[0]
for_recall
<xarray.Dataset> Size: 1GB
Dimensions:   (time: 24, ncells: 5242880, height_3: 1)
Coordinates:
  * height_3  (height_3) float64 8B 90.0
    lat       (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
    lon       (ncells) float64 42MB dask.array<chunksize=(5242880,), meta=np.ndarray>
  * time      (time) datetime64[ns] 192B 1991-01-31T23:59:59 ... 1992-12-31T2...
Dimensions without coordinates: ncells
Data variables:
    clt       (time, ncells) float32 503MB dask.array<chunksize=(1, 5242880), meta=np.ndarray>
    hur       (time, height_3, ncells) float32 503MB dask.array<chunksize=(1, 1, 5242880), meta=np.ndarray>

From the resulting dataset for_recall we can see that 2 (No of variables) times 24 files (No of chunks) would need to be recalled.

no_of_files=len(varsel)*len(for_recall["clt"].chunks[0])

It is optimal to submit as many as poosible file requests at once. Therefore, we will configure the dask scheduler such that we have sufficient threads to assign each file to a thread.

from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(no_of_files)):
    for_recall.compute()
  • This compute call will take long depending on the load of the archive

  • When you stop the command, the recalls will finish in the background. You can close this session, return the next day and start working on the now cached data.

In case you have issues, have a look on the log of slk in $HOME/.slk/slk-cli.log. It may contain reports of the retrievals which will be helpful for our tape admins to find a solution.

!tail $HOME/.slk/slk-cli.log
2025-03-18 15:34:19 l40091.lvt.dkrz.de 1854022 INFO  Executing command: "version"
2025-03-18 15:49:10 l40091.lvt.dkrz.de 1854628 INFO  Executing command: "version"
2025-03-18 15:51:23 l40091.lvt.dkrz.de 1855021 INFO  Executing command: "version"
2025-03-18 15:53:49 l40091.lvt.dkrz.de 1855399 INFO  Executing command: "version"
2025-03-18 15:57:23 l40091.lvt.dkrz.de 1855713 INFO  Executing command: "version"
2025-03-19 09:36:49 l40192.lvt.dkrz.de 340304 INFO  Executing command: "list /arch/bm1344/IFS-FESOM/outdata/"
2025-03-19 09:36:50 l40192.lvt.dkrz.de 340304 ERROR Namespace ID or Children of Namespace not found in list command.
2025-03-19 09:36:54 l40192.lvt.dkrz.de 340460 INFO  Executing command: "list /arch/bm1344/IFS-FESOM2/outdata/"
2025-03-19 09:41:15 l40192.lvt.dkrz.de 340869 INFO  Executing command: "version"
2025-03-19 11:14:19 l40192.lvt.dkrz.de 636839 INFO  Executing command: "version"