Source code for xpublish.plugins.included.zarr

import logging
from typing import Sequence

import cachey  # type: ignore
import xarray as xr
from fastapi import APIRouter, Depends, HTTPException, Path
from starlette.responses import Response  # type: ignore

from xpublish.utils.api import JSONResponse

from ...utils.api import DATASET_ID_ATTR_KEY
from ...utils.cache import CostTimer
from ...utils.zarr import (
    ZARR_METADATA_KEY,
    array_meta_key,
    attrs_key,
    encode_chunk,
    get_data_chunk,
    get_zmetadata,
    get_zvariables,
    group_meta_key,
    jsonify_zmetadata,
)

# type: ignore
from .. import Dependencies, Plugin, hookimpl

logger = logging.getLogger('zarr_api')


[docs] class ZarrPlugin(Plugin): """Adds Zarr-like accessing endpoints for datasets.""" name: str = 'zarr' dataset_router_prefix: str = '/zarr' dataset_router_tags: Sequence[str] = ['zarr']
[docs] @hookimpl def dataset_router(self, deps: Dependencies) -> APIRouter: # noqa: D102 router = APIRouter( prefix=self.dataset_router_prefix, tags=list(self.dataset_router_tags), ) @router.get(f'/{ZARR_METADATA_KEY}') def get_zarr_metadata( dataset=Depends(deps.dataset), cache=Depends(deps.cache), ) -> dict: """Consolidated Zarr metadata.""" zvariables = get_zvariables(dataset, cache) zmetadata = get_zmetadata(dataset, cache, zvariables) zjson = jsonify_zmetadata(dataset, zmetadata) return JSONResponse(zjson) @router.get(f'/{group_meta_key}') def get_zarr_group( dataset=Depends(deps.dataset), cache=Depends(deps.cache), ) -> dict: """Zarr group data.""" zvariables = get_zvariables(dataset, cache) zmetadata = get_zmetadata(dataset, cache, zvariables) return JSONResponse(zmetadata['metadata'][group_meta_key]) @router.get(f'/{attrs_key}') def get_zarr_attrs( dataset=Depends(deps.dataset), cache=Depends(deps.cache), ) -> dict: """Zarr attributes.""" zvariables = get_zvariables(dataset, cache) zmetadata = get_zmetadata(dataset, cache, zvariables) return JSONResponse(zmetadata['metadata'][attrs_key]) @router.get('/{var}/{chunk}') def get_variable_chunk( var: str = Path(description='Variable in dataset'), chunk: str = Path(description='Zarr chunk'), dataset: xr.Dataset = Depends(deps.dataset), cache: cachey.Cache = Depends(deps.cache), ): """Get a zarr array chunk. This will return cached responses when available. """ zvariables = get_zvariables(dataset, cache) zmetadata = get_zmetadata(dataset, cache, zvariables) # First check that this request wasn't for variable metadata if array_meta_key in chunk: return zmetadata['metadata'][f'{var}/{array_meta_key}'] elif attrs_key in chunk: return JSONResponse(zmetadata['metadata'][f'{var}/{attrs_key}']) elif group_meta_key in chunk: raise HTTPException(status_code=404, detail='No subgroups') else: logger.debug('var is %s', var) logger.debug('chunk is %s', chunk) cache_key = dataset.attrs.get(DATASET_ID_ATTR_KEY, '') + '/' + f'{var}/{chunk}' response = cache.get(cache_key) if response is None: with CostTimer() as ct: arr_meta = zmetadata['metadata'][f'{var}/{array_meta_key}'] da = zvariables[var].data data_chunk = get_data_chunk( da, chunk, out_shape=arr_meta['chunks'], ) echunk = encode_chunk( data_chunk.tobytes(), filters=arr_meta['filters'], compressor=arr_meta['compressor'], ) response = Response( echunk, media_type='application/octet-stream', ) cache.put(cache_key, response, ct.time, len(echunk)) return response return router