Source code for moniplot.image_to_xarray

#
# https://github.com/rmvanhees/moniplot.git
#
# Copyright (c) 2022 SRON - Netherlands Institute for Space Research
#
# License:  GPLv3
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
This module contains the routines `h5_to_xr` and `data_to_xr`.
These functions store a HDF5 dataset or numpy array in a labeled array
(class `xarray.DataArray`).
"""
from __future__ import annotations

__all__ = ['h5_to_xr', 'data_to_xr']

from pathlib import PurePath

import h5py
import numpy as np
import xarray as xr


# - local functions --------------------------------
def __get_attrs(dset: h5py.Dataset, field: str) -> dict:
    """Return attributes of the HDF5 dataset.

    Parameters
    ----------
    dset :  h5py.Dataset
       HDF5 dataset from which the attributes are read
    field : str
       Name of field in compound dataset

    Returns
    -------
    dict with numpy arrays
    """
    _field = None
    if field is not None:
        try:
            _field = {'name': field,
                      'oneof': len(dset.dtype.names),
                      'index': dset.dtype.names.index(field)}
        except Exception as exc:
            raise RuntimeError(
                f'field {field} not found in dataset {dset.name}') from exc
        # print('_field ', _field)

    attrs = {}
    for key in dset.attrs:
        if key in ('CLASS', 'DIMENSION_LIST', 'NAME', 'REFERENCE_LIST',
                   '_Netcdf4Dimid', '_Netcdf4Coordinates'):
            continue

        attr_value = dset.attrs[key]
        # print('# ----- ', key, type(attr_value), attr_value)
        if isinstance(attr_value, np.ndarray):
            if len(attr_value) == 1:
                attr_value = attr_value[0]
                # print('# ----- ', key, type(attr_value), attr_value)
            elif _field is not None:
                if len(attr_value) == _field['oneof']:
                    attr_value = attr_value[_field['index']]
                # elif isinstance(attr_value, np.void):
                #    attr_value = attr_value[0]

        attrs[key] = (attr_value.decode('ascii')
                      if isinstance(attr_value, bytes) else attr_value)

    return attrs


def __get_coords(dset: h5py.Dataset, data_sel: tuple[slice | int]) -> list:
    r"""Return coordinates of the HDF5 dataset with dimension scales.

    Parameters
    ----------
    dset :  h5py.Dataset
       HDF5 dataset from which the data is read
    data_sel :  tuple of slice or int
       A numpy slice generated for example `numpy.s\_`

    Returns
    -------
    A sequence of tuples [(dims, data), ...]
    """
    coords = []
    if len(dset.dims) == dset.ndim:
        try:
            for ii, dim in enumerate(dset.dims):
                # get name of dimension
                name = PurePath(dim[0].name).name
                if name.startswith('row') or name.startswith('column'):
                    name = name.split(' ')[0]

                # determine coordinate
                buff = None
                if dim[0].size > 0 and not np.all(dim[0][()] == 0):
                    buff = dim[0][()]
                elif name in ('row', 'column'):
                    d_type = 'u2' if ((dset.shape[ii]-1) >> 16) == 0 else 'u4'
                    buff = np.arange(dset.shape[ii], dtype=d_type)

                if not (buff is None or data_sel is None):
                    buff = buff[data_sel[ii]]

                coords.append((name, buff))
        except RuntimeError:
            coords = []

    return coords


def __set_coords(dset: h5py.Dataset | np.ndarray,
                 data_sel: tuple[slice | int] | None,
                 dims: list | None) -> list:
    r"""Set coordinates of the HDF5 dataset.

    Parameters
    ----------
    dset :  h5py.Dataset or np.ndarray
       HDF5 dataset from which the data is read, or numpy array
    data_sel :  tuple of slice or int
       A numpy slice generated for example `numpy.s\_`
    dims : list of strings
       Alternative names for the dataset dimensions if not attached to dataset
       Default coordinate names are ['time', ['row', ['column']]]

    Returns
    -------
    A sequence of tuples [(dims, data), ...]
    """
    if dims is None:
        if dset.ndim > 3:
            raise ValueError('not implemented for ndim > 3')

        dims = ['time', 'row', 'column'][-dset.ndim:]

    coords = []
    for ii in range(dset.ndim):
        co_dtype = 'u2' if ((dset.shape[ii]-1) >> 16) == 0 else 'u4'
        buff = np.arange(dset.shape[ii], dtype=co_dtype)
        if data_sel is not None:
            buff = buff[data_sel[ii]]
        coords.append((dims[ii], buff))

    return coords


def __get_data(dset: h5py.Dataset, data_sel: tuple[slice | int] | None,
               field: str) -> np.ndarray:
    r"""Return data of the HDF5 dataset.

    Parameters
    ----------
    dset :  h5py.Dataset
       HDF5 dataset from which the data is read
    data_sel :  tuple of slice or int
       A numpy slice generated for example `numpy.s\_`
    field : str
       Name of field in compound dataset or None

    Returns
    -------
    Numpy array

    Notes
    -----
    Read floats always as doubles
    """
    if data_sel is None:
        data_sel = ()

    if np.issubdtype(dset.dtype, np.floating):
        data = dset.astype(float)[data_sel]
        data[data == float.fromhex('0x1.ep+122')] = np.nan
        return data

    if field is None:
        return dset[data_sel]

    data = dset.fields(field)[data_sel]
    if np.issubdtype(data.dtype, np.floating):
        data = data.astype(float)
        data[data == float.fromhex('0x1.ep+122')] = np.nan
    return data


def __check_selection(data_sel: slice | tuple | int,
                      ndim: int) -> slice | tuple | None:
    r"""Check and correct user provided data selection.

    Notes
    -----
    If data_sel is used to select data from a dataset then the number of
    dimensions of data_sel should agree with the HDF5 dataset or one and
    only one Ellipsis has to be used.
    Thus allowed values for data_sel are:
    * [always]: (), np.s\_[:], np.s\_[...]
    * [1-D dataset]: np.s\_[:-1], np.s\_[0]
    * [2-D dataset]: np.s\_[:-1, :], np.s\_[0, :], np.s\_[:-1, 0]
    * [3-D dataset]: np.s\_[:-1, :, 2:4], np.s\_[0, :, :], np.s\_[:-1, 0, 2:4]
    * [Ellipsis] np.s\_[0, ...], np.s\_[..., 4], np.s\_[0, ..., 4]
    """
    if data_sel in (np.s_[:], np.s_[...], np.s_[()]):
        return None

    if np.isscalar(data_sel):
        return np.s_[data_sel:data_sel+1]

    buff = ()
    for val in data_sel:
        if val == Ellipsis:
            for _ in range(ndim - len(data_sel) + 1):
                buff += np.index_exp[:]
        elif np.isscalar(val):
            buff += (np.s_[val:val+1],)
        else:
            buff += (val,)

    return buff


# - main function ----------------------------------
[docs]def h5_to_xr(h5_dset: h5py.Dataset, data_sel: tuple[slice | int] | None = None, *, dims: list[str] | None = None, field: str | None = None) -> xr.DataArray: r"""Create xarray.DataArray from a HDF5 dataset (with dimension scales). Implements a lite interface with the xarray.DataArray, should work for all 2-D detector images, sequences of detector measurements and trend data. Parameters ---------- h5_dset : h5py.Dataset Data, dimensions, coordinates and attributes are read for this dataset data_sel : tuple of slice or int, optional A numpy slice generated for example `numpy.s\_` dims : list of strings, optional Alternative names for the dataset dimensions if not attached to dataset field : str, optional Name of field in compound dataset or None Returns ------- xarray.DataArray Notes ----- All floating datasets are converted to Python type 'float' Dimensions and Coordinates: * The functions in this module should work with netCDF4 and HDF5 files. * In a HDF5 file the 'coordinates' of a dataset can be defined using \ dimension scales. * In a netCDF4 file this is required: all variables have dimensions, \ which can have coordinates. But under the hood also netCDF4 uses \ dimension scales. * The xarray DataArray structure will have as dimensions, the names of \ the dimension scales and as coordinates the names and data of the \ dimensions scales, except when the data only contains zero's. * The default dimensions of an image are 'row' and 'column' with evenly \ spaced values created with np.arange(len(dim), dtype=uint). Examples -------- Read HDF5 dataset 'signal' from file:: > fid = h5py.File(flname, 'r') # flname is a HDF5/netCDF4 file > xdata = h5_to_xr(fid['signal']) > fid.close() Combine Tropomi SWIR data of band7 and band8:: > fid = h5py.File(s5p_b7_prod, 'r') # Tropomi band7 product > xdata7 = h5_to_xr(fid['signal']) > fid.close() > fid = h5py.File(s5p_b8_prod, 'r') # Tropomi band8 product > xdata8 = h5_to_xr(fid['signal']) > fid.close() > xdata = xr.concat((xdata7, xdata8), dim='spectral_channel') Optionally, fix the 'column' dimension:: > xdata = xdata.assign_coords( > ... column = np.arange(xdata.column.size, dtype='u4')) """ # Check data selection if data_sel is not None: data_sel = __check_selection(data_sel, h5_dset.ndim) # Name of this array name = PurePath(h5_dset.name).name # Values for this array data = __get_data(h5_dset, data_sel, field) # Coordinates (tick labels) to use for indexing along each dimension coords = [] if dims is None: coords = __get_coords(h5_dset, data_sel) if not coords: coords = __set_coords(h5_dset, data_sel, dims) # - check if dimension of dataset and coordinates agree if data.ndim < len(coords): for ii in reversed(range(len(coords))): if np.isscalar(coords[ii][1]): del coords[ii] # - remove empty coordinates dims = [] co_dict = {} for key, val in coords: dims.append(key) if val is not None: co_dict[key] = val # Attributes to assign to the array attrs = __get_attrs(h5_dset, field) return xr.DataArray(data, coords=co_dict, dims=dims, name=name, attrs=attrs)
[docs]def data_to_xr(data: np.ndarray, *, dims: list[str] | None = None, name: str | None = None, long_name: str | None = None, units: str | None = None) -> xr.DataArray: """Create xarray.DataArray from a dataset. Implements a lite interface with the xarray.DataArray, should work for all 2-D detector images, sequences of detector measurements and trend data. Parameters ---------- data : np.ndarray Data to be stored in the xarray dims : list of strings, optional Names for the dataset dimensions name : str, optional A string that names the instance units : str, optional Units of the data, default: '1' long_name : str, optional Long name describing the data, default: empty string Returns ------- xarray.DataArray Notes ----- All floating datasets are converted to Python type 'float' """ coords = __set_coords(data, None, dims) attrs = {'units': '1' if units is None else units, 'long_name': '' if long_name is None else long_name} return xr.DataArray(data, coords=coords, name=name, attrs=attrs)