Intake I part 3 - DKRZ Catalogs: ERA5 data#

DKRZ intake catalogs cover different projects. This notebook describes the data project ERA5 and the catalog for the ERA5 data.

Agenda

In this part, you learn

  1. what ERA5 is

  2. how to find the collection

  3. browsing through the ERA5 collection

  4. how to load ERA5 data with intake-esm

ERA5, its features and use cases#

ERA (‘ECMWF Re-Analysis’) refers to a series of climate reanalysis datasets produced at the European Centre for Medium-Range Weather Forecasts. Climate reanalyses combine observations with models to generatÏe consistent time series of multiple climate variables. ERA5 (ERA fifth generation) is the latest climate reanalysis which is produced by Copernicus Climate Change Service (C3S) at ECMWF. It replaces ERA-Interim and other predecessor ERA datasets such as, e.g., ERA-40, ERA-15 and ERA-20C.

Contracted by the German Meteorological Service, the World Data Centre for Climate (WDCC) at DKRZ is the German distributor of a selection of these data.

ERA5 is a global comprehensive reanalysis, from 1979 to near real time. The period 1959 to 1979 was only recently released and is currently being transferred to DKRZ.

Features#

  • Spatial resolution is about 31 km globally

  • Dependent on the parameter, the data are stored on a reduced Gaussian Grid (N320)
    or as spectral coefficients (with a triangular truncation of T639)

  • Provided on 137/37 different model/pressure levels

  • Temporal coverage from 1940 up to today (1959-1979 newly released)

  • Temporal resolution from hourly, daily to monthly

Use cases#

ERA5 data have a broad range of applications, some of which are

  • forcing of (regional) climate models,

  • evaluation of climate models with reanalysis,

  • comparison of weather observations to data of other scientific fields.

Further information#

Please mail to data [at] dkrz [dot] de and visit the DKRZ Webpage

Find and open the collection#

First of all, we need to import the required packages

import intake

We use intake to open the main catalog which includes all project catalogs and sub catalogs.

intake opens catalogs for data sources given in yaml format. These contain information about plugins and sources required for accessing and loading the data. The command is open_catalog:

#dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#
#only for the web page we need to take the original link:
dkrz_catalog=intake.open_catalog(["https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml"])

Use print and list to find out what the catalog contains:

list(dkrz_catalog)
['dkrz_cmip5_archive',
 'dkrz_cmip5_disk',
 'dkrz_cmip6_cloud',
 'dkrz_cmip6_disk',
 'dkrz_cordex_disk',
 'dkrz_dyamond-winter_disk',
 'dkrz_era5_disk',
 'dkrz_monsoon_disk',
 'dkrz_mpige_disk',
 'dkrz_nextgems_disk',
 'dkrz_palmod2_disk']

We now focus on the ERA5 collection

col=dkrz_catalog.dkrz_era5_disk
/sw/spack-levante/mambaforge-22.9.0-2-Linux-x86_64-kptncg/lib/python3.10/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (13,14) have mixed types. Specify dtype option on import or set low_memory=False.
  df = pd.read_csv(
col.description
"This is an ESM collection for ERA5 data accessible on the DKRZ's disk storage system in /work/bk1099/data/"

Now, we print the variable col to see information on the data assets properties and associated metadata (e.g. which institution the data come from).

col

dkrz_era5_disk catalog with 42 dataset(s) from 4236711 asset(s):

unique
project 1
era_id 3
dataType 2
level_type 3
frequency 3
stepType 4
table_id 2
code 116
validation_date 31499
initialization_date 31500
uri 4236711
format 1
derived_short_name 0

The ERA5 catalog consists of 16 datasets from about 3,6mio assets/files.

ERA5 collection’s facets#

The ERA5 Catalog enables to browse through the data base using 10 search facets. We could group them into 4 categories:

Basic data information:

  • era_id: Three type of ERA data are available:

    • E5: ERA5 data

    • E1: ERA5.1 data (corrected ERA5 data of the years 2000 to 2006)

    • ET: ERA5T is a provisional, early-release version of the ERA5 data

  • dataType: Two data types are available: Analysis data are pure analysis and only contain intensive data (like temperature). Forecast data contain extensive data (like precipitation) which are accumulated quantities.

  • uri: Corresponds to the path on DKRZ’s HPC file system.

Information on the type of vertical level:

  • level_type: Three types are available: model_level, pressure_level or surface

Temporal information. The ERA5 database starts in January 1979 (the years until 1959 are currently being added).

  • stepType: Is the variable accumulated, instantaneous or averaged?

  • frequency: What is the temporal resolution of the data? The database contains hourly, daily and monthly data.

  • validation_date: The date when the analysis is valid.

  • initialization_date: The date when the forecast started.

Variable identifier (redundant) and attributes:

  • code : Corresponds to the GRIB code of the variable in the file.

  • table_id : Specifies which GRIB code table associated with the Grib code.

If you require more information on the variables, the catalog can be loaded with more columns. You can find out additional era5 attributes from the main catalog via:

dkrz_catalog.metadata["parameters"]["additional_era5_disk_columns"]
{'default': ['path', 'units', 'long_name', 'short_name'], 'type': 'list[str]'}

You can load these into the catalog by providing a keyword argument:

cols=dkrz_catalog._entries["dkrz_era5_disk"]._open_args["read_csv_kwargs"]["usecols"]+dkrz_catalog.metadata["parameters"]["additional_era5_disk_columns"]["default"]
col=dkrz_catalog.dkrz_era5_disk(read_csv_kwargs=dict(usecols=cols))
/sw/spack-levante/mambaforge-22.9.0-2-Linux-x86_64-kptncg/lib/python3.10/site-packages/intake_esm/cat.py:264: DtypeWarning: Columns (13,14) have mixed types. Specify dtype option on import or set low_memory=False.
  df = pd.read_csv(
  • short_name : A short identifier similar to the netCDF variable name.

  • long_name : A longer description of the variable.

  • units : The units of the variable.

We can obtain more information on the individual elements by using e.g.

uniques=col.unique()
uniques["dataType"]
['an', 'fc']

The ERA5 database has two unique dataTypes:

  • fc for forecast. All files which contain “12” in their name are forecast data.

  • an for analysis. All files which contain “00” in their name are analysis data.

uniques["frequency"]
['hourly', 'daily', 'invariant']

The ERA5 database contains data with hourly, daily and monthly frequency.

uniques["level_type"]
['model_level', 'pressure_level', 'surface']

The ERA5 database contains surface level (sfc) data. In addition, it contains vertically resolved data at model levels (ml, 137 levels) and at pressure levels (pl, 37 levels).

uniques["stepType"]
['instant', 'avg', 'max', 'accum']

The ERA5 database covers six stepTypes. This attribute is parsed from the GRIB attribute GRIB_stepType:

  • ‘accum’

  • ‘max’

  • ‘avg’

  • ‘instant’

We can check which combinations of dataType, level_type and frequency exist by using thegroupby function of the underlying dataframe:

list(col.df.groupby(["dataType", "level_type", "frequency"]).groups.keys())
[('an', 'model_level', 'hourly'),
 ('an', 'pressure_level', 'hourly'),
 ('an', 'surface', 'daily'),
 ('an', 'surface', 'hourly'),
 ('an', 'surface', 'invariant'),
 ('fc', 'surface', 'daily'),
 ('fc', 'surface', 'hourly'),
 ('fc', 'surface', 'invariant')]

Browse through the ERA5 collection#

We can search through the intake collection by using its search function. E.g., we can search for ERA5 data on pressure_level in hourly frequency by:

query=dict(level_type="pressure_level",
           frequency="hourly")
cat=col.search(**query)

The variable cat is a new sub-catalog i.e. a subset of the original catalog.
To see the variables contained in this sub-catalog, we print what unique variable long names exists :

cat.unique()["long_name"]
['Potential vorticity',
 'Specific rain water content',
 'Specific snow water content',
 'Geopotential',
 'Temperature',
 'U component of wind',
 'V component of wind',
 'Specific humidity',
 'Vertical velocity',
 'Vorticity (relative)',
 'Divergence',
 'Relative humidity',
 'Ozone mass mixing ratio',
 'Specific cloud liquid water content',
 'Specific cloud ice water content',
 'Fraction of cloud cover']

We can select a specific variable by another search, e.g. for Temperature.
We can also subset the temporal coverage that we are interested in. intake allows using wildcards in the search.
In the sub-catalog of hourly pressure level data, we can search e.g. for temperature data that are valid for January 1980 using:

temp_hourly_pl=cat.search(long_name="Temperature",
                         validation_date="1980-01.*")

We print the variable’s short name:

temp_hourly_pl.unique()["short_name"]
['t']

Open multiple ERA5 files as xarray datasets#

We can open the entire selection at once with to_dataset_dict. The result will be a dictionary of xarray datasets. For this, we have to specify a configuration for xarray via the xarray_open_kwargs argument.

cdf_kwargs={
    "engine":"cfgrib",
    "chunks":{
        "time":1
    }
}

The engine indicates what backend xarray has to use to open the files (here: cfgrib since the ERA5 data are stored in GRIB format). For high frequency 3d variables, we recommend to use small chunking of dask arrays on the time axis which can be done by setting chunks. This approach saves memory and returns futures of arrays which are only computed and loaded if needed.


This may take a while. We can ignore warnings printed by the underlying `cfgrib` library.
%%capture

temp_hourly_pl_xr_dict=temp_hourly_pl.to_dataset_dict(
    xarray_open_kwargs={
        "engine":"cfgrib",
        "chunks":{
            "time":1
        }           
    }           
)
temp_hourly_pl_xr_dict


The dictionary temp_hourly_pl_xr_dict has exactly one entry because all files of the sub-catalog temp_hourly_pl have been merged along the time axis. The default configurations that control operations on the sub-catalog can be parsed as follows:

temp_hourly_pl.esmcat.aggregation_control
AggregationControl(variable_column_name='short_name', groupby_attrs=['era_id', 'table_id', 'stepType', 'level_type', 'frequency'], aggregations=[Aggregation(type=<AggregationType.join_existing: 'join_existing'>, attribute_name='validation_date', options={'compat': 'override', 'coords': 'minimal', 'dim': 'time'}), Aggregation(type=<AggregationType.join_existing: 'join_existing'>, attribute_name='initialization_date', options={'compat': 'override', 'coords': 'minimal', 'dim': 'time'}), Aggregation(type=<AggregationType.union: 'union'>, attribute_name='short_name', options={})])


Now, let’s get our dataset and have a look. We extract the last (and only) entry from temp_hourly_pl_xr_dict using the popitem method. popitem returns a tuple of size 2. The first tuple (index 0) contains the key ‘128.0.instant.pressure_level.hourly’, the second tuple (index 1) contains the dataset:

temp_hourly_pl_xr_dset=temp_hourly_pl_xr_dict.popitem()[1]

Please note that once the method popitem is applied upon temp_hourly_pl_xr_dict, it return an empty temp_hourly_pl_xr_dict dictionary.

temp_hourly_pl_xr_dset is an xarray dataset. We can take advantage of xarray techniques to e.g.

  • select the 500hPa level and

  • calculate the mean for the selected month January

t500mean=temp_hourly_pl_xr_dset.sel(isobaricInhPa=500.,
                           method="nearest").mean(dim="time")

How does the new xarray dataset t500mean look like?

t500mean
<xarray.Dataset>
Dimensions:        (values: 542080)
Coordinates:
    number         int64 0
    step           timedelta64[ns] 00:00:00
    isobaricInhPa  float64 500.0
    latitude       (values) float64 dask.array<chunksize=(542080,), meta=np.ndarray>
    longitude      (values) float64 dask.array<chunksize=(542080,), meta=np.ndarray>
Dimensions without coordinates: values
Data variables:
    t              (values) float32 dask.array<chunksize=(542080,), meta=np.ndarray>

We see that the values of the data variable t are given as dask.array.
Using compute, we manually trigger loading the data of this dataset into memory and to return a new dataset.

t500mean.compute()
<xarray.Dataset>
Dimensions:        (values: 542080)
Coordinates:
    number         int64 0
    step           timedelta64[ns] 00:00:00
    isobaricInhPa  float64 500.0
    latitude       (values) float64 89.78 89.78 89.78 ... -89.78 -89.78 -89.78
    longitude      (values) float64 0.0 20.0 40.0 60.0 ... 300.0 320.0 340.0
Dimensions without coordinates: values
Data variables:
    t              (values) float32 233.1 233.1 233.1 ... 238.0 238.0 238.0

Plotting the data with the plot function shows the zonal gradient of 500 hPa temperature (in K) in January 1980. The x-axis is a proxy for the latitude (North->South direction). The figure reflects mid-atmosphere temperature (500 hPa) strongly increases from the poles towards the Equator.

t500mean.t.plot()
[<matplotlib.lines.Line2D at 0x7ffe545a6ef0>]
_images/388e4fcebb56beaf8efdb7afa87c1118d013fa3e1b7e16bfbcec206918d10870.png

P.S.: For Forecast variables, we recommend to define and use the following preprocessing function so that you generate a single time axis on the fly. You can apply this function by adding the preprocess=pp keyword argument to the to_dataset_dict function.

def pp(ds):
    if "depthBelowLandLayer" in ds.coords:
        del ds["depthBelowLandLayer"]
    import datetime as dt
    if "step" in ds.dims:
        corrected_list = []
        time0=ds.isel(time=0)["time"]
        time0hour=ds.isel(time=0)["time.hour"]
        timegroups=ds.groupby('time')
        for idx, (label,group) in enumerate(timegroups) :
            group["step"]=group["step"]+(group["time"]-time0)
            #default: 18h
            begin_step_hour=6
            end_step_hour=30

            if time0hour == 6 and time0.dt.year!=1940 :
                begin_step_hour=18
                end_step_hour=42
            elif time0hour == 6 and time0.dt.year==1940 :
                begin_step_hour=0
                end_step_hour=18

            group=group.where(
                (group["step"].dt.seconds/60/60+group["step"].dt.days*24)>begin_step_hour-1,
                drop=True
            )
            group=group.where(
                (group["step"].dt.seconds/60/60+group["step"].dt.days*24)<end_step_hour,
                drop=True
            )

            corrected_list.append(group)
        corrected_dset=xr.concat(corrected_list,dim="step")
        del corrected_dset["time"]
        return corrected_dset.rename(
            dict(
                #valid_time="time",
                step="time",
            )
        ).set_index(time="valid_time")
    return ds