Intake V - create intake-esm catalog from scratch#

Agenda

In this part, you learn

  1. When to build intake-esm collections

  2. How to create a standardized intake-esm catalog from scratch

    1. How to equip the catalog with attributes and configurations for assets and aggregation

    2. How to add the collection of assets to the catalog

  3. How to validate and save the newly created catalog

  4. How to configure the catalog to process multivariable assets

Intake is a cataloging tool for data repositories. It opens catalogs with drivers. Drivers can be plug-ins like intake-esm.

This tutorial gives insight into the creation of a intake-esm catalogs. We recommend this specific driver for intake when working with ESM-data as the plugin allows to load the data with the widely used and accepted tool xarray.

Note

This tutorial creates a catalog from scratch. If you work based on another catalog, it might be sufficient for you to look into intake II - save subset

1. When should I create an intake-esm catalog?#

Cataloging your data set with a static catalog for easy access is beneficial if

  • the data set is stable 🏔 such that you do not have to update the content of the catalog to make it usable at all

  • the data set is very large 🗃 such that browsing and accessing data via file system is less performant

  • the data set should be shared 🔀 with many people such that you cannot use a data base format

2. Create an intake-esm catalog which complies to esmcat-specs#

In order to create a well-defined, helpful catalog, you have to answer the following questions:

  • What should be search facetts of the catalog?

  • How are assets of the catalog combined to a dataset?

  • How should xarray open the data set?

For intake-esm catalogs, an early standard has been developped to ensure compatibility across different intake-esm catalogs. We will follow those specs in this tutorial.

In the code example, we will use a python dictionary in this example but you could also write directly into a file with your favorite editor. We start with a catalog dictionary intake_esm_catalog and add the required basic meta data:

intake_esm_catalog={
    # we follow the esmcat specs version 0.1.0: 
    'esmcat_version': '0.1.0',
    'id': 'Intake-esmI',
    'description': "This is an intake catalog created for the intake tutorial"
}

2.1. Create the description#

The description contains all the meta data which is necessary to understand the catalog. That makes the catalog self-descriptive. It also includes configuration for intake how to load assets of the data set(s) with the specified driver.

Define attributes of your catalog#

The catalog’s collection uses attributes to describe the assets. These attributes are defined in the description via python dictionaries and given as a list in the intake-esm catalog .json file, e.g.:

"attributes": [
    {
      "column_name": "activity_id",
      "vocabulary": "https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json"
    },
]

and will be accessed by users from the loaded catalog variable catalog via:

catalog.esmcat.attributes

Catalog’s attributes should allow users to

  • effectively browse:

    • The in-memory representation and the visulation tool for the catalog is a Pandas DataFrame. By specifying a column_name, the columns of the DataFrame are generated by the attributes of the catalog.

    • Additionally, the column_name of the catalog’s attributes can be used as search facetts - they will be keyword arguments of the catalog.search() function

  • understand the content: You can provide information to the attributes, e.g. by specifying a vocabulary for all available (or allowed) values of the attribute.

➡ The collection must have values for all defined attributes (see 3.2.)

➡ In other terms: If assets should be integrated into the catalog, they have to be described with these attributes.

Best Practise

  • The best configuration is reached if all datasets can be uniquely identified. I.e., if the users fill out all search facets, they will end up with only one dataset.

  • Do not exaggerate with supply of additional columns. Users may be confused when many search fields have similar meanings. Also, the display of the DataFrame should fit into the window width.

Use case: Catalog for project data on a file system

Given a more than one level directory tree, ensure that:

  • All files are on the same and deepest directory level.

  • Each directory level has the same meaning across the project data. E.g. the deepest directory can have the meaning version.

This can easily be done by creating a directory structure template and check against their definitions.

If that is approved, each directory level can be used as an catalog’s attribute.

attributes=[]
directory_structure_template="mip_era/activity_id/institution_id/source_id/experiment_id/member_id/table_id/variable_id/grid_label/version"
for att in directory_structure_template.split('/'):
    attributes.append(
        dict(column_name=att,
             vocabulary=f"https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_{att}.json"
            )
    )
intake_esm_catalog["attributes"]=attributes    
print(intake_esm_catalog)
{'esmcat_version': '0.1.0', 'id': 'Intake-esmI', 'description': 'This is an intake catalog created for the intake tutorial', 'attributes': [{'column_name': 'mip_era', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_mip_era.json'}, {'column_name': 'activity_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'}, {'column_name': 'institution_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'}, {'column_name': 'source_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'}, {'column_name': 'experiment_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'}, {'column_name': 'member_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_member_id.json'}, {'column_name': 'table_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'}, {'column_name': 'variable_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_variable_id.json'}, {'column_name': 'grid_label', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'}, {'column_name': 'version', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_version.json'}]}

Note

For data managemant purposes in general, we highly recoomend to define a path_template and a filename_template for a clear directory structure before storing any data.

  • You can add more attributes from files or by parsing filenames

Define the assets column of your catalog#

The assets entry is a python dictionary in the catalog similiar to an attribute, e.g.:

  "assets": {
    "column_name": "path",
    "format": "netcdf"
  },

The assets of a catalog refer to the data source that can be loaded by intake. Assets are essential for connecting intake’s function of browsing with the function of accessing the data. It contains

  • a column_name which is associated with the keyword in the collection. The value of column_name in the collection points at the asset which can be loaded by xarray.

  • the entry format specifies the dataformat of the asset.

Note

If you have assets of mixed types, you can substitute format by format_column_name so that both information for the asset is taken from the collection

assets={
    "column_name": "path",
    "format": "netcdf"
  }
intake_esm_catalog["assets"]=assets
print(intake_esm_catalog)
{'esmcat_version': '0.1.0', 'id': 'Intake-esmI', 'description': 'This is an intake catalog created for the intake tutorial', 'attributes': [{'column_name': 'mip_era', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_mip_era.json'}, {'column_name': 'activity_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'}, {'column_name': 'institution_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'}, {'column_name': 'source_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'}, {'column_name': 'experiment_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'}, {'column_name': 'member_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_member_id.json'}, {'column_name': 'table_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'}, {'column_name': 'variable_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_variable_id.json'}, {'column_name': 'grid_label', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'}, {'column_name': 'version', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_version.json'}], 'assets': {'column_name': 'path', 'format': 'netcdf'}}

Optional: Define aggregation control for your data sets#

Note

If aggregation_control is not defined, intake opens one xarray dataset per asset

One goal of a catalog is to the make access of the data as analysis ready as possible. Therefore, intake-esm features aggregating multiple assets to a larger single data set. If aggregation_control is defined in the catalog and users run the catalog’s to_dataset_dict() function, a Python dictionary of aggregated xarray datasets is created. The logic for merging and/or concatenating the catalog into datasets has to be configured under aggregation_control.

The implementation works such that the variable’s dimensions are either enhanced by a new dimension or an existing dimension is extended with new data included in the addtional assets.

  • aggregation_control is a dictionary in the catalog. If it is set, three keywords have to be configured:

    • variable_column_name: In the collection, the variable name is specified under that column. Intake-esm will aggregate assets with the same name only. Thus, all assets to be combined to a dataset have to include at least one unique variable. If your assets contain more than one data variable and users should be able to subset with intake, check multi variable assets.

    • groupby_attrs: assets attributed with different values of the groupby_attrs should not be aggregated to one xarray dataset. E.g., if you have data for different ESMs in one catalog you do not want users to merge them into one dataset. The groupby_attrs will be combined to the key of the aggregated dataset in the returned dictionary of to_dataset_dict().

    • aggregations: Specification of how xarray should combine assets with same values of these groupby_attrs.

. E.g.:

  "aggregation_control": {
    "variable_column_name": "variable_id",
    "groupby_attrs": [
      "activity_id",
      "institution_id"
    ],
    "aggregations": [
      {
        "type": "union",
        "attribute_name": "variable_id"
      }
    ]
  }

Let’s start with defining variable_column_name and groupby_attrs:

aggregation_control=dict(
    variable_column_name="variable_id",
    groupby_attrs=[
        "activity_id",
        "institution_id"
    ]
)

Best Practise

  • A well-defined aggregation control contains all defined attributes

Aggregations:

aggregations is an optional list of dictionaries each of which configures

  • on which dimension of the variable the assets should be aggregated

  • optionally: what keyword arguments should be passed to xarray’s concat() and merge() functions

for one attribute/column of the catalog given as attribute_name.

A dictionary of the aggregations list is named aggregation object and has to include three specifications:

  • attribute_name: the column name which is not a groupby_attr and should be used for aggregating a single variable over a dimension

  • type: Can either be

    • join_new:

    • join_existing

    • union

  • optional: options: Keyword arguments for xarray

The following defines that variable_id will be taken for a unique dataset:

aggregation_control["aggregations"]=[dict(
    attribute_name="variable_id",
    type="union"
)]

Now, we configure intake to use time for extending the existing dimension time. Therefore, we have to add options with “dim”:”time” as keyword argument for xarray:

aggregation_control["aggregations"].append(
    dict(
        attribute_name="time_range",
        type="join_existing",
        options={ "dim": "time", "coords": "minimal", "compat": "override" }
    )
)

We can also, kind of retrospectively, combine all member of an ensemble on a new dimension of a variable:

aggregation_control["aggregations"].append(
    dict(
        attribute_name= "member_id",
        type= "join_new",
        options={ "coords": "minimal", "compat": "override" }
    )
)

Note

It is not possible to pre-configure dask options for xarray. Be sure that users of your catalog know if and how to set chunks.

intake_esm_catalog["aggregation_control"]=aggregation_control
print(intake_esm_catalog)
{'esmcat_version': '0.1.0', 'id': 'Intake-esmI', 'description': 'This is an intake catalog created for the intake tutorial', 'attributes': [{'column_name': 'mip_era', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_mip_era.json'}, {'column_name': 'activity_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'}, {'column_name': 'institution_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'}, {'column_name': 'source_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'}, {'column_name': 'experiment_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'}, {'column_name': 'member_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_member_id.json'}, {'column_name': 'table_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'}, {'column_name': 'variable_id', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_variable_id.json'}, {'column_name': 'grid_label', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'}, {'column_name': 'version', 'vocabulary': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_version.json'}], 'assets': {'column_name': 'path', 'format': 'netcdf'}, 'aggregation_control': {'variable_column_name': 'variable_id', 'groupby_attrs': ['activity_id', 'institution_id'], 'aggregations': [{'attribute_name': 'variable_id', 'type': 'union'}, {'attribute_name': 'time_range', 'type': 'join_existing', 'options': {'dim': 'time', 'coords': 'minimal', 'compat': 'override'}}, {'attribute_name': 'member_id', 'type': 'join_new', 'options': {'coords': 'minimal', 'compat': 'override'}}]}}

2.2. Create the data base for the catalog#

The collection of assets can be specified either

  • under catalog_dict as a list of dictionaries inside the catalog. One asset including all attribute specifications is saved as an individual dictionary, e.g.:

    "catalog_dict": [
        {
            "filename": "/work/mh0287/m221078/prj/switch/icon-oes/experiments/khwX155/outdata/khwX155_atm_mon_18500101.nc",
            "variable": "tas_gmean"
        }
    ]
  • or under catalog_file which refers to a separate .csv file, e.g.

    "catalog_file": "dkrz_cmip6_disk_netcdf.csv.gz"

Option A: Catalog_dict implementation#

Assuming, we would like to create a catalog for all files in /work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/, we can parse the path with our directory_structure_template:

trunk="/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/"
trunkdict={}
for i,item in enumerate(directory_structure_template.split('/')):
    trunkdict[item]=trunk.split('/')[-(len(directory_structure_template.split('/'))-i+1)]

Afterwards, we can associate all files in that directory with these attributes and the additional time_range and path using os:

import os
filelist=!ls {trunk}
catalog_dict=[]
for asset in filelist:
    assetdict={}
    assetdict["time_range"]=asset.split('.')[0].split('_')[-1]
    assetdict["path"]=trunk+asset
    assetdict.update(trunkdict)
    catalog_dict.append(assetdict)

Then, we put that dict into the catalog:

intake_esm_catalog["catalog_dict"]=catalog_dict

Option B: Catalog_file implementation#

The catalog_file format needs to comply with the following rules:

  • all file types that can be opened by pandas are allowed to be set as catalog_file

  • the .csv file needs a header which includes all catalog attributes

An example would be:

filename,variable
/work/mh0287/m221078/prj/switch/icon-oes/experiments/khwX155/outdata/khwX155_atm_mon_18500101.nc,tas_gmean

Note

  • Note that the catalog_file can also live in the cloud i.e. be an URL. You can host both the collection and catalog in the cloud as DKRZ does.

Best practice

For keeping clear overview, you better use the same prefix name for both catalog and catalog_file.

import pandas as pd

catalog_dict_df=pd.DataFrame(catalog_dict)

Saving a separate data base for assets or use a dictionary in the catalog?#

Use case: Updating the collection for a living project on file system#

Solution: Write a builder script and run it as a cronjob (automatically and regularly):

A typical builder for a community project contains the following sequence:

  1. Create one or more lists of files based on a find shell command on the data base directory. This type of job is also named crawler as it crawls through the file system.

  2. Read the lists of files and create a pandas DataFrame for these files.

  3. Parse the file names and file paths and fill column values. That can be easily done by deconstructing filepaths and filenames into their parts assuming you defined a mandatory

    • Filenames that cannot be parsed should be sorted out

  4. The data frame is saved as the final catalog as a .csv file. You can also compress it to .csv.gz.

At DKRZ, we run scripts for project data on disk repeatedly in cronjobs to keep the catalog updated.

Builder tool examples#

  • The NCAR builder tool for community projects like CMIP6 and CMIP5.

  • DKRZ builder notebooks (based on NCAR tools) like this Era5 notebook

3. Validate and save the catalog:#

If we open the defined catalog with open_esm_datastore() and try to_dataset_dict(), we can check if our creation is successful. The resulting catalog should give us exactly 1 dataset from 18 assets as we aggregate over time.

import intake
validated_cat=intake.open_esm_datastore(
    obj=dict(
        df=catalog_dict_df,
        esmcat=intake_esm_catalog
    )
)
validated_cat

Intake-esmI catalog with 1 dataset(s) from 1 asset(s):

unique
time_range 1
path 1
mip_era 1
activity_id 1
institution_id 1
source_id 1
experiment_id 1
member_id 1
table_id 1
variable_id 1
grid_label 1
version 1
derived_variable_id 0
validated_cat.to_dataset_dict()
--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id'
0.00% [0/1 00:00<?]
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
File /envs/lib/python3.11/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self)
    223 datasets = [
    224     _open_dataset(
    225         record[self.path_column_name],
   (...)
    241     for _, record in self.df.iterrows()
    242 ]
--> 244 datasets = dask.compute(*datasets)
    245 if len(datasets) == 1:

File /envs/lib/python3.11/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    664     postcomputes.append(x.__dask_postcompute__())
--> 666 results = schedule(dsk, keys, **kwargs)
    667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /envs/lib/python3.11/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads

File /envs/lib/python3.11/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)

File /envs/lib/python3.11/site-packages/dask/local.py:319, in reraise(exc, tb)
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File /envs/lib/python3.11/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
    225 id = get_id()

File /envs/lib/python3.11/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk)
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):

File /envs/lib/python3.11/site-packages/dask/utils.py:73, in apply(func, args, kwargs)
     72 if kwargs:
---> 73     return func(*args, **kwargs)
     74 else:

File /envs/lib/python3.11/site-packages/intake_esm/source.py:66, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options)
     65 elif fsspec.utils.can_be_local(urlpath):
---> 66     url = fsspec.open_local(urlpath, **storage_options)
     67 else:

File /envs/lib/python3.11/site-packages/fsspec/core.py:477, in open_local(url, mode, **storage_options)
    473     raise ValueError(
    474         "open_local can only be used on a filesystem which"
    475         " has attribute local_file=True"
    476     )
--> 477 with of as files:
    478     paths = [f.name for f in files]

File /envs/lib/python3.11/site-packages/fsspec/core.py:177, in OpenFiles.__enter__(self)
    176         break
--> 177 return [s.__enter__() for s in self]

File /envs/lib/python3.11/site-packages/fsspec/core.py:177, in <listcomp>(.0)
    176         break
--> 177 return [s.__enter__() for s in self]

File /envs/lib/python3.11/site-packages/fsspec/core.py:102, in OpenFile.__enter__(self)
    100 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 102 f = self.fs.open(self.path, mode=mode)
    104 self.fobjects = [f]

File /envs/lib/python3.11/site-packages/fsspec/spec.py:1241, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1240 ac = kwargs.pop("autocommit", not self._intrans)
-> 1241 f = self._open(
   1242     path,
   1243     mode=mode,
   1244     block_size=block_size,
   1245     autocommit=ac,
   1246     cache_options=cache_options,
   1247     **kwargs,
   1248 )
   1249 if compression is not None:

File /envs/lib/python3.11/site-packages/fsspec/implementations/local.py:184, in LocalFileSystem._open(self, path, mode, block_size, **kwargs)
    183     self.makedirs(self._parent(path), exist_ok=True)
--> 184 return LocalFileOpener(path, mode, fs=self, **kwargs)

File /envs/lib/python3.11/site-packages/fsspec/implementations/local.py:315, in LocalFileOpener.__init__(self, path, mode, autocommit, fs, compression, **kwargs)
    314 self.blocksize = io.DEFAULT_BUFFER_SIZE
--> 315 self._open()

File /envs/lib/python3.11/site-packages/fsspec/implementations/local.py:320, in LocalFileOpener._open(self)
    319 if self.autocommit or "w" not in self.mode:
--> 320     self.f = open(self.path, mode=self.mode)
    321     if self.compression:

FileNotFoundError: [Errno 2] No such file or directory: "/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/ls: cannot open directory '/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/tas/gn/v20190710/': Permission denied"

The above exception was the direct cause of the following exception:

ESMDataSourceError                        Traceback (most recent call last)
Cell In[15], line 1
----> 1 validated_cat.to_dataset_dict()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()

File /envs/lib/python3.11/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    680         except Exception as exc:
    681             if not skip_on_error:
--> 682                 raise exc
    683 self.datasets = self._create_derived_variables(datasets, skip_on_error)
    684 return self.datasets

File /envs/lib/python3.11/site-packages/intake_esm/core.py:678, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    676 for task in gen:
    677     try:
--> 678         key, ds = task.result()
    679         datasets[key] = ds
    680     except Exception as exc:

File /envs/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /envs/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File /envs/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File /envs/lib/python3.11/site-packages/intake_esm/core.py:820, in _load_source(key, source)
    819 def _load_source(key, source):
--> 820     return key, source.to_dask()

File /envs/lib/python3.11/site-packages/intake_esm/source.py:277, in ESMDataSource.to_dask(self)
    275 def to_dask(self):
    276     """Return xarray object (which will have chunks)"""
--> 277     self._load_metadata()
    278     return self._ds

File /envs/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
    281 """load metadata only if needed"""
    282 if self._schema is None:
--> 283     self._schema = self._get_schema()
    284     self.dtype = self._schema.dtype
    285     self.shape = self._schema.shape

File /envs/lib/python3.11/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self)
    206 def _get_schema(self) -> Schema:
    207     if self._ds is None:
--> 208         self._open_dataset()
    209         metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
    210         self._schema = Schema(
    211             datashape=None,
    212             dtype=None,
   (...)
    215             extra_metadata=metadata,
    216         )

File /envs/lib/python3.11/site-packages/intake_esm/source.py:269, in ESMDataSource._open_dataset(self)
    266     self._ds.attrs[OPTIONS['dataset_key']] = self.key
    268 except Exception as exc:
--> 269     raise ESMDataSourceError(
    270         f"""Failed to load dataset with key='{self.key}'
    271          You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
    272          """
    273     ) from exc

ESMDataSourceError: Failed to load dataset with key='ScenarioMIP.DKRZ'
                 You can use `cat['ScenarioMIP.DKRZ'].df` to inspect the assets/files for this key.
                 

Intake esm allows to write catalog file(s) with the serialize() function. The only argument is the name of the catalog which will be used as filename. It writes the two parts of the catalog either together in a .json file:

validated_cat.serialize("validated_cat")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/validated_cat.json

Or in two seperated files if we provide catalog_type=file as a second argument. The test.json may be very large while we can save disk space if we svae the data base in a separate .csv.gz file:

validated_cat.serialize("validated_cat", catalog_type="file")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/validated_cat.json

4. Multivariable assets#

If an asset contains more than one variable, intake-esm also features pre-selection of a variable before loading the data. Here is a user guide on how to configure the collection for that.

  1. the variable_column of the catalog must contain iterables (list, tuple, set) of values.

  2. the user must specifiy a dictionary of functions for converting values in certain columns into iterables. This is done via the csv_kwargs argument such that the collection needs to be opened as follows:

import ast
import intake

col = intake.open_esm_datastore(
    "multi-variable-collection.json",
    csv_kwargs={"converters": {"variable": ast.literal_eval}},
)
col