Intake IV - preprocessing and derived variables#

Agenda

Based on DKRZ’s CMIP6 catalog, you learn in this part how to

  1. add a preprocessing to to_dataset_dict()

  2. create a derived variable registry

import intake
#dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#only for generating the web page we need to take the original link:
dkrz_cdp=intake.open_catalog(["https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml"])
esm_dkrz=dkrz_cdp.dkrz_cmip6_disk

Use Preprocessing when opening assets and creating datasets#

When calling intake-esm’s to_dataset_dict function, we can pass an argument preprocess. Its value should be a function which is applied to all assets before they are opened.

Note

For CMIP6, a preprocessing package has been developped for homogenizing and preparing datasets of different ESMs for a grand analysis featuring

  • renaming and setting of coordinates

  • adjusting grid values to fit into a common range (0-360 for lon)

E.g., if you would like to set some specific variables as coordinates, you can define a function which

  • receives an xarray dataset as an argument

  • returns a new xarray dataset

def correct_coordinates(ds) :
    """converts wrongly assigned data_vars to coordinates"""
    ds = ds.copy()
    for co in [
        "x",
        "y",
        "lon",
        "lat",
        "lev",
        "bnds",
        "lev_bounds",
        "lon_bounds",
        "lat_bounds",
        "time_bounds",
        "lat_verticies",
        "lon_verticies",
    ]:
        if co in ds.variables:
            ds = ds.set_coords(co)
    return ds

Now, when you open the dataset dictionary, you provide it for preprocess:

cat=esm_dkrz.search(variable_id="tas",
                   table_id="Amon",
                   source_id="MPI-ESM1-2-HR",
                   member_id="r1i1p1f1",
                   experiment_id="ssp370"
                  )
test_dsets=cat.to_dataset_dict(
    zarr_kwargs={"consolidated":True},
    cdf_kwargs={"chunks":{"time":1}},
    preprocess=correct_coordinates
)
--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.source_id.experiment_id.table_id.grid_label'
/tmp/ipykernel_804/2104413171.py:7: DeprecationWarning: cdf_kwargs and zarr_kwargs are deprecated and will be removed in a future version. Please use xarray_open_kwargs instead.
  test_dsets=cat.to_dataset_dict(
0.00% [0/1 00:00<?]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /envs/lib/python3.11/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self)
    223 datasets = [
    224     _open_dataset(
    225         record[self.path_column_name],
   (...)
    241     for _, record in self.df.iterrows()
    242 ]
--> 244 datasets = dask.compute(*datasets)
    245 if len(datasets) == 1:

File /envs/lib/python3.11/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    664     postcomputes.append(x.__dask_postcompute__())
--> 666 results = schedule(dsk, keys, **kwargs)
    667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /envs/lib/python3.11/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads

File /envs/lib/python3.11/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)

File /envs/lib/python3.11/site-packages/dask/local.py:319, in reraise(exc, tb)
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File /envs/lib/python3.11/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
    225 id = get_id()

File /envs/lib/python3.11/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk)
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):

File /envs/lib/python3.11/site-packages/dask/utils.py:73, in apply(func, args, kwargs)
     72 if kwargs:
---> 73     return func(*args, **kwargs)
     74 else:

File /envs/lib/python3.11/site-packages/intake_esm/source.py:77, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options)
     76 else:
---> 77     ds = xr.open_dataset(url, **xarray_open_kwargs)
     78     if preprocess is not None:

File /envs/lib/python3.11/site-packages/xarray/backends/api.py:566, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    565 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 566 backend_ds = backend.open_dataset(
    567     filename_or_obj,
    568     drop_variables=drop_variables,
    569     **decoders,
    570     **kwargs,
    571 )
    572 ds = _dataset_from_backend_dataset(
    573     backend_ds,
    574     filename_or_obj,
   (...)
    584     **kwargs,
    585 )

TypeError: NetCDF4BackendEntrypoint.open_dataset() got an unexpected keyword argument 'consolidated'

The above exception was the direct cause of the following exception:

ESMDataSourceError                        Traceback (most recent call last)
Cell In[3], line 7
      1 cat=esm_dkrz.search(variable_id="tas",
      2                    table_id="Amon",
      3                    source_id="MPI-ESM1-2-HR",
      4                    member_id="r1i1p1f1",
      5                    experiment_id="ssp370"
      6                   )
----> 7 test_dsets=cat.to_dataset_dict(
      8     zarr_kwargs={"consolidated":True},
      9     cdf_kwargs={"chunks":{"time":1}},
     10     preprocess=correct_coordinates
     11 )

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()

File /envs/lib/python3.11/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    680         except Exception as exc:
    681             if not skip_on_error:
--> 682                 raise exc
    683 self.datasets = self._create_derived_variables(datasets, skip_on_error)
    684 return self.datasets

File /envs/lib/python3.11/site-packages/intake_esm/core.py:678, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    676 for task in gen:
    677     try:
--> 678         key, ds = task.result()
    679         datasets[key] = ds
    680     except Exception as exc:

File /envs/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /envs/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File /envs/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File /envs/lib/python3.11/site-packages/intake_esm/core.py:820, in _load_source(key, source)
    819 def _load_source(key, source):
--> 820     return key, source.to_dask()

File /envs/lib/python3.11/site-packages/intake_esm/source.py:277, in ESMDataSource.to_dask(self)
    275 def to_dask(self):
    276     """Return xarray object (which will have chunks)"""
--> 277     self._load_metadata()
    278     return self._ds

File /envs/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
    281 """load metadata only if needed"""
    282 if self._schema is None:
--> 283     self._schema = self._get_schema()
    284     self.dtype = self._schema.dtype
    285     self.shape = self._schema.shape

File /envs/lib/python3.11/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self)
    206 def _get_schema(self) -> Schema:
    207     if self._ds is None:
--> 208         self._open_dataset()
    209         metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
    210         self._schema = Schema(
    211             datashape=None,
    212             dtype=None,
   (...)
    215             extra_metadata=metadata,
    216         )

File /envs/lib/python3.11/site-packages/intake_esm/source.py:269, in ESMDataSource._open_dataset(self)
    266     self._ds.attrs[OPTIONS['dataset_key']] = self.key
    268 except Exception as exc:
--> 269     raise ESMDataSourceError(
    270         f"""Failed to load dataset with key='{self.key}'
    271          You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
    272          """
    273     ) from exc

ESMDataSourceError: Failed to load dataset with key='ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'
                 You can use `cat['ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'].df` to inspect the assets/files for this key.
                 

Derived variables#

Most of the following is taken from the intake-esm tutorial.

A “derived variable” in this case is a variable that doesn’t itself exist in an intake-esm catalog, but can be computed (i.e., “derived”) from variables that do exist in the catalog. Currently, the derived variable implementation requires variables on the same grid, etc.; i.e., it assumes that all variables involved can be merged within the same dataset. […] Derived variables could include more sophsticated diagnostic output like aggregations of terms in a tracer budget or gradients in a particular field.

The registry of the derived variables can be connected to the catalog. When users open

import intake
import intake_esm
from intake_esm import DerivedVariableRegistry