Intake III - work with two catalogs and merge them#

Agenda

Based on DKRZ’s CMIP6 and CORDEX catalogs and a Pangeo’s CMIP6 catalog, you learn in this part,

how to merge catalogs by combining their data bases with differently formatted assets.

This includes

  1. Loading catalogs with a user-defined set of attributes

  2. Comparing meta data for checking for compatibility

  3. Merging the data bases via merge or concat

  4. Configure a catalog description for accessing datasets across projects

  5. Make ALL data accessible and consolidate aggregation

  6. Save the new catalog

Questions

  • how can you find out how compatible the catalogs are? Would you have to sanitize the column names?

  • what is overlap? Which of the 1000 datasets of pange are included in DKRZ’s?

This tutorial highlights two use cases:

  1. Merging two projects (CMIP6 and CORDEX, both from DKRZ)

  2. Merging two data bases for the same project (CMIP6 DKRZ and CMIP6 Pangeo)

For each, the ultimate Goal of this tutorial is to create a merged catalog which also enables data access to data sets of both catalogs.

Case 1: Merge two projects CMIP6 and CORDEX in one catalog#

import intake
import pandas as pd
#dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#only for generating the web page we need to take the original link:
dkrz_catalog=intake.open_catalog(["https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml"])
print([entry for entry in list(dkrz_catalog) if "disk" in entry and ("cordex" in entry or "cmip6" in entry)])
['dkrz_cmip6_disk', 'dkrz_cordex_disk']

Load catalogs with default + common columns#

Most of all DKRZ catalogs include cataloonies attributes. This simplifies the merging as you could already merge the catalogs over these columns. Usable columns of the catalogs are stored in the main catalog’s metadata and can be displayed and retrieved:

dkrz_catalog.metadata
{'parameters': {'additional_cmip6_disk_columns': {'default': ['units',
    'path',
    'opendap_url',
    'long_name'],
   'type': 'list[str]'},
  'additional_era5_disk_columns': {'default': ['path',
    'units',
    'long_name',
    'short_name'],
   'type': 'list[str]'},
  'cataloonie_columns': {'default': ['project',
    'institution_id',
    'source_id',
    'experiment_id',
    'simulation_id',
    'realm',
    'frequency',
    'time_reduction',
    'grid_label',
    'grid_id',
    'level_type',
    'time_min',
    'time_max',
    'time_range',
    'format',
    'uri',
    'variable_id'],
   'type': 'list[str]'}}}
overall_columns=dkrz_catalog.metadata["parameters"]["cataloonie_columns"]["default"]
print(overall_columns)
['project', 'institution_id', 'source_id', 'experiment_id', 'simulation_id', 'realm', 'frequency', 'time_reduction', 'grid_label', 'grid_id', 'level_type', 'time_min', 'time_max', 'time_range', 'format', 'uri', 'variable_id']

However, these attributes are not sufficient for finding an individual assets in CORDEX and CMIP6. We need additional columns:

cordex_columns=dkrz_catalog._entries["dkrz_cordex_disk"]._open_args["read_csv_kwargs"]["usecols"]
print(cordex_columns)
cmip6_columns=dkrz_catalog._entries["dkrz_cmip6_disk"]._open_args["read_csv_kwargs"]["usecols"]
print(cmip6_columns)
['project', 'product_id', 'CORDEX_domain', 'institute_id', 'driving_model_id', 'experiment_id', 'member', 'model_id', 'rcm_version_id', 'frequency', 'variable_id', 'version', 'time_range', 'uri', 'format']
['project', 'activity_id', 'source_id', 'institution_id', 'experiment_id', 'member_id', 'dcpp_init_year', 'table_id', 'variable_id', 'grid_label', 'version', 'time_range', 'uri', 'format']

We open both catalogs with the columns that we have found:

cmip6_cat=dkrz_catalog.dkrz_cmip6_disk(read_csv_kwargs=dict(usecols=cmip6_columns+overall_columns))
/envs/lib/python3.11/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (21,22,23) have mixed types. Specify dtype option on import or set low_memory=False.
  df = pd.read_csv(
cordex_cat=dkrz_catalog.dkrz_cordex_disk(read_csv_kwargs=dict(usecols=cordex_columns+overall_columns))
/envs/lib/python3.11/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (22) have mixed types. Specify dtype option on import or set low_memory=False.
  df = pd.read_csv(

We assume that we are interested in the variable tas which is the Near-Surface Temperature:

cmip6_cat=cmip6_cat.search(variable_id="tas")
cordex_cat=cordex_cat.search(variable_id="tas")

Merge both catalogs#

The underlying DataFrames have different columns. We add CMIP6 columns to the CORDEX catalog and vice versa so that we can merge:

for cordex_col in list(set(cordex_columns)-set(overall_columns)):
    cmip6_cat.df.loc[:,cordex_col]="None"
for cmip6_col in list(set(cmip6_columns)-set(overall_columns)):
    cordex_cat.df.loc[:,cmip6_col]="None"
for column in overall_columns+cmip6_columns+cordex_columns :
    cmip6_cat.df[column]=cmip6_cat.df[column].astype(str)
    cordex_cat.df[column]=cordex_cat.df[column].astype(str)
overall_df=pd.merge(cmip6_cat.df, cordex_cat.df, on=overall_columns+cmip6_columns+cordex_columns, how="outer")
overall_df
activity_id institution_id source_id experiment_id member_id table_id variable_id grid_label dcpp_init_year version ... time_max format uri member model_id institute_id rcm_version_id driving_model_id product_id CORDEX_domain
0 AerChemMIP BCC BCC-ESM1 hist-piAer r1i1p1f1 Amon tas gn nan None ... 201412 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... None None None None None None None
1 AerChemMIP BCC BCC-ESM1 hist-piAer r2i1p1f1 Amon tas gn nan None ... 201412 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... None None None None None None None
2 AerChemMIP BCC BCC-ESM1 hist-piAer r2i1p1f1 day tas gn nan None ... 20141231 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... None None None None None None None
3 AerChemMIP BCC BCC-ESM1 hist-piAer r3i1p1f1 Amon tas gn nan None ... 201412 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... None None None None None None None
4 AerChemMIP BCC BCC-ESM1 hist-piAer r3i1p1f1 day tas gn nan None ... 20141231 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... None None None None None None None
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
211137 None MPI-CSC MPI-CSC-REMO2009 rcp85 None None tas nan None None ... 206011.0 netcdf /work/kd0956/CORDEX/data/cordex/output/WAS-44i... r1i1p1 MPI-CSC-REMO2009 MPI-CSC v1 MPI-M-MPI-ESM-LR output WAS-44i
211138 None MPI-CSC MPI-CSC-REMO2009 rcp85 None None tas nan None None ... 207011.0 netcdf /work/kd0956/CORDEX/data/cordex/output/WAS-44i... r1i1p1 MPI-CSC-REMO2009 MPI-CSC v1 MPI-M-MPI-ESM-LR output WAS-44i
211139 None MPI-CSC MPI-CSC-REMO2009 rcp85 None None tas nan None None ... 208011.0 netcdf /work/kd0956/CORDEX/data/cordex/output/WAS-44i... r1i1p1 MPI-CSC-REMO2009 MPI-CSC v1 MPI-M-MPI-ESM-LR output WAS-44i
211140 None MPI-CSC MPI-CSC-REMO2009 rcp85 None None tas nan None None ... 209011.0 netcdf /work/kd0956/CORDEX/data/cordex/output/WAS-44i... r1i1p1 MPI-CSC-REMO2009 MPI-CSC v1 MPI-M-MPI-ESM-LR output WAS-44i
211141 None MPI-CSC MPI-CSC-REMO2009 rcp85 None None tas nan None None ... 210011.0 netcdf /work/kd0956/CORDEX/data/cordex/output/WAS-44i... r1i1p1 MPI-CSC-REMO2009 MPI-CSC v1 MPI-M-MPI-ESM-LR output WAS-44i

211142 rows × 29 columns

Redefine catalog description#

We copy the entire .json description file so that we can edit it.

mixed_esmcol_data=dict(cmip6_cat.esmcat).copy()
mixed_esmcol_data["aggregation_control"]=dict(mixed_esmcol_data["aggregation_control"]).copy()
mixed_esmcol_data
{'esmcat_version': '0.1.0',
 'attributes': [Attribute(column_name='project', vocabulary=''),
  Attribute(column_name='activity_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'),
  Attribute(column_name='source_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'),
  Attribute(column_name='institution_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'),
  Attribute(column_name='experiment_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'),
  Attribute(column_name='member_id', vocabulary=''),
  Attribute(column_name='table_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'),
  Attribute(column_name='variable_id', vocabulary=''),
  Attribute(column_name='grid_label', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'),
  Attribute(column_name='version', vocabulary=''),
  Attribute(column_name='dcpp_init_year', vocabulary=''),
  Attribute(column_name='time_range', vocabulary=''),
  Attribute(column_name='uri', vocabulary=''),
  Attribute(column_name='simulation_id', vocabulary=''),
  Attribute(column_name='realm', vocabulary=''),
  Attribute(column_name='frequency', vocabulary=''),
  Attribute(column_name='time_reduction', vocabulary=''),
  Attribute(column_name='grid_label', vocabulary=''),
  Attribute(column_name='grid_id', vocabulary=''),
  Attribute(column_name='level_type', vocabulary=''),
  Attribute(column_name='time_min', vocabulary=''),
  Attribute(column_name='time_max', vocabulary=''),
  Attribute(column_name='opendap_url', vocabulary=''),
  Attribute(column_name='path', vocabulary=''),
  Attribute(column_name='units', vocabulary=''),
  Attribute(column_name='long_name', vocabulary='')],
 'assets': Assets(column_name='uri', format=None, format_column_name='format'),
 'aggregation_control': {'variable_column_name': 'variable_id',
  'groupby_attrs': ['activity_id',
   'source_id',
   'experiment_id',
   'table_id',
   'grid_label'],
  'aggregations': [Aggregation(type=<AggregationType.union: 'union'>, attribute_name='variable_id', options={}),
   Aggregation(type=<AggregationType.join_existing: 'join_existing'>, attribute_name='time_range', options={'compat': 'override', 'coords': 'minimal', 'dim': 'time'}),
   Aggregation(type=<AggregationType.join_new: 'join_new'>, attribute_name='member_id', options={'compat': 'override', 'coords': 'minimal'}),
   Aggregation(type=<AggregationType.join_new: 'join_new'>, attribute_name='dcpp_init_year', options={'compat': 'override', 'coords': 'minimal'})]},
 'id': '/work/ik1017/Catalogs/dkrz_cmip6_disk',
 'catalog_dict': None,
 'catalog_file': None,
 'description': "This is a ESM-collection for CMIP6 data on DKRZ's disk storage system which will be loaded from a source file which is in the cloud (see catalog_file)",
 'title': None,
 'last_updated': None}

Let’s have a look at these entries. We can subdivide these into two groups:

  1. Required to be manually changed:

  • groupby_attrs: We will change it such that both CMIP6 and CORDEX datasets can be created.

  • attributes and default_columns: The CORDEX ones need to be added

  • aggregation_control: Must be revised but we can do that afterwards. For now, we will just delete all entries but the one for variable_id

  1. Other attributes

  • assets: Will stay the same as there is no difference between the original catalogs

  • catalog_file: Will be automatically overwritten by Intake when the final catalog is written.

  • Description, esmcat_version, id: Is arbitrary

We will start with adding missing attributes:

columns_already=[dict(k)["column_name"] for k in mixed_esmcol_data["attributes"]]
columns_already
['project',
 'activity_id',
 'source_id',
 'institution_id',
 'experiment_id',
 'member_id',
 'table_id',
 'variable_id',
 'grid_label',
 'version',
 'dcpp_init_year',
 'time_range',
 'uri',
 'simulation_id',
 'realm',
 'frequency',
 'time_reduction',
 'grid_label',
 'grid_id',
 'level_type',
 'time_min',
 'time_max',
 'opendap_url',
 'path',
 'units',
 'long_name']
for k in dict(cordex_cat.esmcat)["attributes"] :
    if dict(k)["column_name"] not in columns_already:
        mixed_esmcol_data["attributes"].append(k)

groupby_attrs:

he attributes used to build an index for a dataset is defined by the order of attributes in the list groupby_attrs. The aggregation methods for CMIP6 datasets and CORDEX datasets differ.

We have to redefine this list. Think about the perfect order and arrangement of attributes.

mixed_esmcol_data["aggregation_control"]["groupby_attrs"]=[
    "CORDEX_domain",
    "driving_model_id",
    "activity_id",
    "institute_id",
    "model_id",
    "experiment_id",
    "frequency",
    "table_id",
    "grid_label"
]

aggregation_control

For now, drop all the aggregation attributes besides variable_id for enabling a quick save of the catalog. Note that the grouping only works if there is at least one entry in the mixed_esmcol_data["aggregation_control"]["aggregations"] list.

for entry in mixed_esmcol_data["aggregation_control"]["aggregations"]:
    if dict(entry)["attribute_name"] != "variable_id" :
        mixed_esmcol_data["aggregation_control"]["aggregations"].remove(entry)
mixed_esmcol_data
{'esmcat_version': '0.1.0',
 'attributes': [Attribute(column_name='project', vocabulary=''),
  Attribute(column_name='activity_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json'),
  Attribute(column_name='source_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json'),
  Attribute(column_name='institution_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json'),
  Attribute(column_name='experiment_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json'),
  Attribute(column_name='member_id', vocabulary=''),
  Attribute(column_name='table_id', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json'),
  Attribute(column_name='variable_id', vocabulary=''),
  Attribute(column_name='grid_label', vocabulary='https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json'),
  Attribute(column_name='version', vocabulary=''),
  Attribute(column_name='dcpp_init_year', vocabulary=''),
  Attribute(column_name='time_range', vocabulary=''),
  Attribute(column_name='uri', vocabulary=''),
  Attribute(column_name='simulation_id', vocabulary=''),
  Attribute(column_name='realm', vocabulary=''),
  Attribute(column_name='frequency', vocabulary=''),
  Attribute(column_name='time_reduction', vocabulary=''),
  Attribute(column_name='grid_label', vocabulary=''),
  Attribute(column_name='grid_id', vocabulary=''),
  Attribute(column_name='level_type', vocabulary=''),
  Attribute(column_name='time_min', vocabulary=''),
  Attribute(column_name='time_max', vocabulary=''),
  Attribute(column_name='opendap_url', vocabulary=''),
  Attribute(column_name='path', vocabulary=''),
  Attribute(column_name='units', vocabulary=''),
  Attribute(column_name='long_name', vocabulary=''),
  Attribute(column_name='CORDEX_domain', vocabulary=''),
  Attribute(column_name='product_id', vocabulary=''),
  Attribute(column_name='institute_id', vocabulary=''),
  Attribute(column_name='driving_model_id', vocabulary=''),
  Attribute(column_name='member', vocabulary=''),
  Attribute(column_name='model_id', vocabulary=''),
  Attribute(column_name='rcm_version_id', vocabulary=''),
  Attribute(column_name='format', vocabulary='')],
 'assets': Assets(column_name='uri', format=None, format_column_name='format'),
 'aggregation_control': {'variable_column_name': 'variable_id',
  'groupby_attrs': ['CORDEX_domain',
   'driving_model_id',
   'activity_id',
   'institute_id',
   'model_id',
   'experiment_id',
   'frequency',
   'table_id',
   'grid_label'],
  'aggregations': [Aggregation(type=<AggregationType.union: 'union'>, attribute_name='variable_id', options={}),
   Aggregation(type=<AggregationType.join_new: 'join_new'>, attribute_name='member_id', options={'compat': 'override', 'coords': 'minimal'})]},
 'id': '/work/ik1017/Catalogs/dkrz_cmip6_disk',
 'catalog_dict': None,
 'catalog_file': None,
 'description': "This is a ESM-collection for CMIP6 data on DKRZ's disk storage system which will be loaded from a source file which is in the cloud (see catalog_file)",
 'title': None,
 'last_updated': None}

NaNs cause trouble in the df, so that we set it to notset:

for k in mixed_esmcol_data["aggregation_control"]["groupby_attrs"]:
    overall_df[overall_df[k].isna()]="notset"
    overall_df[k]=overall_df[k].str.replace("None","notset")
    overall_df[k]=overall_df[k].str.replace("nan","notset")

Now, let us open the combined intake catalog:

cmip6andcordex=intake.open_esm_datastore(
    obj=dict(
        esmcat=mixed_esmcol_data,
        df=overall_df
    )
)

We write the new catalog to disk via:

cmip6andcordex.serialize("test", catalog_type="file")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/test.json

We can test if our configuration works by directly opening it:

intake.open_esm_datastore("test.json").search(experiment_id="historical",
                                              source_id="MPI*",
                                              simulation_id="r1i1*")#.to_dataset_dict(cdf_kwargs=dict(chunks=dict(time=1)))
/envs/lib/python3.11/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (4,13,15,16,22,25,27) have mixed types. Specify dtype option on import or set low_memory=False.
  df = pd.read_csv(

test catalog with 27 dataset(s) from 633 asset(s):

unique
activity_id 2
institution_id 3
source_id 4
experiment_id 1
member_id 2
table_id 7
variable_id 1
grid_label 2
dcpp_init_year 0
version 0
time_range 399
project 2
simulation_id 3
grid_id 1
frequency 7
time_reduction 2
realm 1
level_type 0
time_min 324
time_max 323
format 1
uri 633
member 1
model_id 2
institute_id 2
rcm_version_id 1
driving_model_id 2
product_id 1
CORDEX_domain 8
derived_variable_id 0

Case 2: Merge two data bases for CMIP6#

Assume you are interested in variable tas from table Amon from both catalogs.

You would start look like this:

pangeo=intake.open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/master.yaml")
#
print(list(pangeo.climate))
cmip6_pangeo=intake.open_esm_datastore("https://storage.googleapis.com/cmip6/pangeo-cmip6.json")
cmip6_cat=dkrz_catalog.dkrz_cmip6_disk
#
esm_dkrz_tas=cmip6_cat.search(
        variable_id="tas",
        table_id="Amon"
)
esm_pangeo_tas=cmip6_pangeo.search(
        variable_id="tas",
        table_id="Amon"
)
['cmip6_gcs', 'cmip6_s3', 'GFDL_CM2_6', 'GFDL_CM2_6_s3', 'tracmip', 'tracmip_s3']
print(esm_dkrz_tas)
</work/ik1017/Catalogs/dkrz_cmip6_disk catalog with 1362 dataset(s) from 66069 asset(s)>
print(esm_pangeo_tas)
<pangeo-cmip6 catalog with 975 dataset(s) from 15427 asset(s)>

Let’s

Compare the Metadata#

  1. Both catalogs follow the esmcat-specs which can be seen from the following entry:

print(dict(esm_dkrz_tas.esmcat)["esmcat_version"])
print(dict(esm_pangeo_tas.esmcat)["esmcat_version"])
0.1.0
0.1.0
  1. As both catalogs follow the esmcat standard, they have a list of attributes which we can compare: Indeed, they have exactly the same attributes/columns. In the following, we use pandas DataFrames for better displays:

import pandas as pd
esm_dkrz_atts_df=pd.DataFrame([dict(k) for k in dict(esm_dkrz_tas.esmcat)["attributes"]])
esm_pangeo_atts_df=pd.DataFrame([dict(k) for k in dict(esm_pangeo_tas.esmcat)["attributes"]])
esm_dkrz_atts_df
column_name vocabulary
0 project
1 activity_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
2 source_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
3 institution_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
4 experiment_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
5 member_id
6 table_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
7 variable_id
8 grid_label https://raw.githubusercontent.com/WCRP-CMIP/CM...
9 version
10 dcpp_init_year
11 time_range
12 uri
13 simulation_id
14 realm
15 frequency
16 time_reduction
17 grid_label
18 grid_id
19 level_type
20 time_min
21 time_max
22 opendap_url
23 path
24 units
25 long_name
esm_pangeo_atts_df
column_name vocabulary
0 activity_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
1 source_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
2 institution_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
3 experiment_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
4 member_id
5 table_id https://raw.githubusercontent.com/WCRP-CMIP/CM...
6 variable_id
7 grid_label https://raw.githubusercontent.com/WCRP-CMIP/CM...
8 version
9 dcpp_start_year
esm_pangeo_atts_df.equals(esm_dkrz_atts_df)
False

When working with both catalogs, you would notice that the pangeo’s do not use a prefix character ‘v’ for the values of version however dkrz does. We fix that with:

esm_pangeo_tas.df["version"]= "v" + esm_pangeo_tas.df["version"].astype(str)
  1. The data format: The pangeo catalog contains zarr datasets stored in the google cloud storage while dkrz’s catalog allows different formats by providing a column named format. When we combine these catalogs, we have to consider the different formats

esm_pangeo_tas.esmcat.assets.format
<DataFormat.zarr: 'zarr'>
print(
    esm_dkrz_tas.df["format"].unique(),
    esm_dkrz_tas.esmcat.assets.format_column_name
)
['netcdf'] format

Combine the databases with the underlying DataFrames#

This is a workflow for creating a merged data base:

  1. Find all common column names/keys that are in both data bases.

  2. Create a filtered Catalog

    1. Setting common columns as index in both catalogs

    2. Throw out indices in one catalog that are in both.

  3. Concat the filtered catalog with the reference catalog.

Let us start with 1.:

keys = [key 
        for key in esm_dkrz_tas.df.columns.values 
        if key in esm_pangeo_tas.df.columns.values
       ]
keys
['activity_id',
 'institution_id',
 'source_id',
 'experiment_id',
 'member_id',
 'table_id',
 'variable_id',
 'grid_label',
 'dcpp_init_year',
 'version']

We continue with 2.:

  1. Create a filtered Catalog

We create a multi-index with all common keys with set_index and save these in new variables i1 and i2. These can be used as a filter. The ~ sign reverses the condition in the filter:

i1 = esm_pangeo_tas.df.set_index(keys).index
i2 = esm_dkrz_tas.df.set_index(keys).index
esm_pangeo_tas_filtered=esm_pangeo_tas.df[~i1.isin(i2)]

And finally, 3.

  1. Concat the filtered catalog with the reference catalog.

We use pandas concat function and ignore the indices of both catalogs.

esm_merged=pd.concat([esm_dkrz_tas.df, esm_pangeo_tas_filtered],
                     ignore_index=True)
esm_merged
activity_id institution_id source_id experiment_id member_id table_id variable_id grid_label dcpp_init_year version time_range project format uri zstore
0 AerChemMIP BCC BCC-ESM1 hist-piAer r1i1p1f1 Amon tas gn NaN v20200428 185001-201412 CMIP6 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... NaN
1 AerChemMIP BCC BCC-ESM1 hist-piAer r2i1p1f1 Amon tas gn NaN v20200430 185001-201412 CMIP6 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... NaN
2 AerChemMIP BCC BCC-ESM1 hist-piAer r3i1p1f1 Amon tas gn NaN v20200430 185001-201412 CMIP6 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... NaN
3 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 Amon tas gn NaN v20190621 185001-201412 CMIP6 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... NaN
4 AerChemMIP BCC BCC-ESM1 hist-piNTCF r2i1p1f1 Amon tas gn NaN v20190621 185001-201412 CMIP6 netcdf /work/ik1017/CMIP6/data/CMIP6/AerChemMIP/BCC/B... NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
72250 ScenarioMIP MIROC MIROC6 ssp245 r50i1p1f1 Amon tas gn NaN v20210917 NaN NaN NaN NaN gs://cmip6/CMIP6/ScenarioMIP/MIROC/MIROC6/ssp2...
72251 ScenarioMIP MIROC MIROC6 ssp245 r29i1p1f1 Amon tas gn NaN v20210917 NaN NaN NaN NaN gs://cmip6/CMIP6/ScenarioMIP/MIROC/MIROC6/ssp2...
72252 CMIP MOHC HadGEM3-GC31-LL piControl r1i1p1f1 Amon tas gn NaN v20211103 NaN NaN NaN NaN gs://cmip6/CMIP6/CMIP/MOHC/HadGEM3-GC31-LL/piC...
72253 CMIP CMCC CMCC-CM2-SR5 historical r3i1p2f1 Amon tas gn NaN v20211108 NaN NaN NaN NaN gs://cmip6/CMIP6/CMIP/CMCC/CMCC-CM2-SR5/histor...
72254 CMIP CMCC CMCC-CM2-SR5 historical r2i1p2f1 Amon tas gn NaN v20211109 NaN NaN NaN NaN gs://cmip6/CMIP6/CMIP/CMCC/CMCC-CM2-SR5/histor...

72255 rows × 15 columns

Make ALL data accessible and consolidate aggregation#

Intake enables to load assets of different formats. For that,

  1. the data base must have a column which describes the format of the asset.

  2. only one column contains the information how to access the asset needs to be merged. In our example, the zstore column and the uri column needs to be merged into one common column. We name that uri.

  3. the assets entry in the catalog description needs to be adapted to the new configuration.

We start with

  1. creating a ‘format’ column which is zarr if there is no entry in uri and netcdf in all other cases.

esm_merged["format"]="netcdf"
esm_merged.loc[pd.isna(esm_merged["uri"]),"format"]="zarr"
esm_merged.loc[pd.isna(esm_merged["time_range"]),"time_range"]="*"
  1. We merge the zstore and uri columns in a new column uri. As we need individual values of the asset, we have to loop over the rows.

Note

Whenever you can, you should omit using iterrows because it is rather slow.

esm_merged.loc[pd.isna(esm_merged["uri"]),"uri"]=esm_merged["zstore"]
del esm_merged["zstore"]
  1. We now create a new description.

This will be based on the dkrz catalog. We use that because it has the aggregation over time which we want to maintain.

The assets entry now does not have a direct description of the format but instead a specification of a format_column_name. Also, the column_name is uri instead of path:

new_cat_json=dict(esm_dkrz_tas.esmcat).copy()
new_cat_json["assets"]={
    "column_name":"uri",
    "format_column_name":"format"
}
new_cat_json["id"]="Merged dkrz-pangeo cmip6 subset catalog"

In order to make zarr stores compatible with the aggregation over time, we have to fill in a dummy value in time_range:

esm_merged.loc[pd.isna(esm_merged["time_range"]),"time_range"]="*"

Save the new catalog#

Let us test the new catalog first. We can open the new catalog by providing two arguments to open_esm_datastore:

  • the data base esm_merged

  • the catalog description new_cat_json

Afterwards, we search for a subset which is in both

esm_merged_cat=intake.open_esm_datastore(
    dict(
        esmcat=new_cat_json,
        df=esm_merged
    )
)
esm_merged_cat_test=esm_merged_cat.search(activity_id="ScenarioMIP",
                                          member_id="r1i1p1f1",
                                          grid_label="gn",
                                         source_id=["MPI-ESM1-2-HR","CAS-ESM2-0"])

Since we have two different formats in the catalog, we have to provide keyword arguments for both formats within the to_dataset_dict function.

  • zarr_kwargs={"consolidated":True} is needed because Pangeo’s zarr assets have consolidated metadata

  • cdf_kwargs={"chunks":{"time":1}} configures dask to not use very large arrays

test_dsets=esm_merged_cat_test.to_dataset_dict(
    zarr_kwargs={"consolidated":True},
    cdf_kwargs={"chunks":{"time":1}}
)
--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.source_id.experiment_id.table_id.grid_label'
/tmp/ipykernel_711/38849239.py:1: DeprecationWarning: cdf_kwargs and zarr_kwargs are deprecated and will be removed in a future version. Please use xarray_open_kwargs instead.
  test_dsets=esm_merged_cat_test.to_dataset_dict(
0.00% [0/8 00:00<?]
/envs/lib/python3.11/site-packages/xarray/core/dataset.py:265: UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading.
  warnings.warn(
/envs/lib/python3.11/site-packages/xarray/core/dataset.py:265: UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading.
  warnings.warn(
/envs/lib/python3.11/site-packages/xarray/core/dataset.py:265: UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading.
  warnings.warn(
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /envs/lib/python3.11/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self)
    223 datasets = [
    224     _open_dataset(
    225         record[self.path_column_name],
   (...)
    241     for _, record in self.df.iterrows()
    242 ]
--> 244 datasets = dask.compute(*datasets)
    245 if len(datasets) == 1:

File /envs/lib/python3.11/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    664     postcomputes.append(x.__dask_postcompute__())
--> 666 results = schedule(dsk, keys, **kwargs)
    667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /envs/lib/python3.11/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads

File /envs/lib/python3.11/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)

File /envs/lib/python3.11/site-packages/dask/local.py:319, in reraise(exc, tb)
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File /envs/lib/python3.11/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
    225 id = get_id()

File /envs/lib/python3.11/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk)
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):

File /envs/lib/python3.11/site-packages/dask/utils.py:73, in apply(func, args, kwargs)
     72 if kwargs:
---> 73     return func(*args, **kwargs)
     74 else:

File /envs/lib/python3.11/site-packages/intake_esm/source.py:77, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options)
     76 else:
---> 77     ds = xr.open_dataset(url, **xarray_open_kwargs)
     78     if preprocess is not None:

File /envs/lib/python3.11/site-packages/xarray/backends/api.py:566, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    565 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 566 backend_ds = backend.open_dataset(
    567     filename_or_obj,
    568     drop_variables=drop_variables,
    569     **decoders,
    570     **kwargs,
    571 )
    572 ds = _dataset_from_backend_dataset(
    573     backend_ds,
    574     filename_or_obj,
   (...)
    584     **kwargs,
    585 )

TypeError: NetCDF4BackendEntrypoint.open_dataset() got an unexpected keyword argument 'consolidated'

The above exception was the direct cause of the following exception:

ESMDataSourceError                        Traceback (most recent call last)
Cell In[47], line 1
----> 1 test_dsets=esm_merged_cat_test.to_dataset_dict(
      2     zarr_kwargs={"consolidated":True},
      3     cdf_kwargs={"chunks":{"time":1}}
      4 )

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()

File /envs/lib/python3.11/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()

File /envs/lib/python3.11/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    680         except Exception as exc:
    681             if not skip_on_error:
--> 682                 raise exc
    683 self.datasets = self._create_derived_variables(datasets, skip_on_error)
    684 return self.datasets

File /envs/lib/python3.11/site-packages/intake_esm/core.py:678, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    676 for task in gen:
    677     try:
--> 678         key, ds = task.result()
    679         datasets[key] = ds
    680     except Exception as exc:

File /envs/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /envs/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File /envs/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File /envs/lib/python3.11/site-packages/intake_esm/core.py:820, in _load_source(key, source)
    819 def _load_source(key, source):
--> 820     return key, source.to_dask()

File /envs/lib/python3.11/site-packages/intake_esm/source.py:277, in ESMDataSource.to_dask(self)
    275 def to_dask(self):
    276     """Return xarray object (which will have chunks)"""
--> 277     self._load_metadata()
    278     return self._ds

File /envs/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
    281 """load metadata only if needed"""
    282 if self._schema is None:
--> 283     self._schema = self._get_schema()
    284     self.dtype = self._schema.dtype
    285     self.shape = self._schema.shape

File /envs/lib/python3.11/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self)
    206 def _get_schema(self) -> Schema:
    207     if self._ds is None:
--> 208         self._open_dataset()
    209         metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
    210         self._schema = Schema(
    211             datashape=None,
    212             dtype=None,
   (...)
    215             extra_metadata=metadata,
    216         )

File /envs/lib/python3.11/site-packages/intake_esm/source.py:269, in ESMDataSource._open_dataset(self)
    266     self._ds.attrs[OPTIONS['dataset_key']] = self.key
    268 except Exception as exc:
--> 269     raise ESMDataSourceError(
    270         f"""Failed to load dataset with key='{self.key}'
    271          You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
    272          """
    273     ) from exc

ESMDataSourceError: Failed to load dataset with key='ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'
                 You can use `cat['ScenarioMIP.MPI-ESM1-2-HR.ssp370.Amon.gn'].df` to inspect the assets/files for this key.
                 

That worked fine. Now we save the catalog with serialize. We will separate the catalog into two files, the database .csv.gz file and the descriptor .json file. We can do that by passing the catalog_type keyword argument:

esm_merged_cat.serialize(name="our_catalog", catalog_type="file")
Successfully wrote ESM catalog json file to: file:///builds/data-infrastructure-services/tutorials-and-use-cases/docs/source/our_catalog.json