Source code for xpublish.rest

import cachey
import uvicorn
import xarray as xr
from fastapi import FastAPI, HTTPException

from .dependencies import get_cache, get_dataset, get_dataset_ids
from .routers import base_router, common_router, dataset_collection_router, zarr_router
from .utils.api import (
    SingleDatasetOpenAPIOverrider,
    check_route_conflicts,
    normalize_app_routers,
    normalize_datasets,
)


def _dataset_from_collection_getter(datasets):
    """Used to override the get_dataset FastAPI dependency in case where
    a collection of datasets is being served.

    """

    def get_dataset(dataset_id: str):
        if dataset_id not in datasets:
            raise HTTPException(status_code=404, detail=f"Dataset '{dataset_id}' not found")

        return datasets[dataset_id]

    return get_dataset


def _dataset_unique_getter(dataset):
    """Used to override the get_dataset FastAPI dependency in case where
    only one dataset is being served, e.g., via the 'rest' accessor.

    """

    def get_dataset():
        return dataset

    return get_dataset


def _set_app_routers(dataset_routers=None, dataset_route_prefix=''):

    app_routers = []

    # top-level api endpoints
    app_routers.append((common_router, {}))

    if dataset_route_prefix:
        app_routers.append((dataset_collection_router, {'tags': ['info']}))

    # dataset-specifc api endpoints
    if dataset_routers is None:
        dataset_routers = [
            (base_router, {'tags': ['info']}),
            (zarr_router, {'tags': ['zarr']}),
        ]

    app_routers += normalize_app_routers(dataset_routers, dataset_route_prefix)

    check_route_conflicts(app_routers)

    return app_routers


[docs]class Rest: """Used to publish one or more Xarray Datasets via a REST API (FastAPI application). To publish a single dataset via its own FastAPI application, you might want to use the :attr:`xarray.Dataset.rest` accessor instead for more convenience. It provides the same interface than this class. Parameters ---------- datasets : :class:`xarray.Dataset` or dict A single :class:`xarray.Dataset` object or a mapping of datasets objects to be served. If a mapping is given, keys are used as dataset ids and are converted to strings. See also the notes below. routers : list, optional A list of dataset-specific :class:`fastapi.APIRouter` instances to include in the fastAPI application. If None, the default routers will be included. The items of the list may also be tuples with the following format: ``[(router1, {'prefix': '/foo', 'tags': ['foo', 'bar']})]``, where the 1st tuple element is a :class:`fastapi.APIRouter` instance and the 2nd element is a dictionary that is used to pass keyword arguments to :meth:`fastapi.FastAPI.include_router`. cache_kws : dict, optional Dictionary of keyword arguments to be passed to :meth:`cachey.Cache.__init__()`. By default, the cache size is set to 1MB, but this can be changed with ``available_bytes``. app_kws : dict, optional Dictionary of keyword arguments to be passed to :meth:`fastapi.FastAPI.__init__()`. Notes ----- The urls of the application's API endpoints differ whether a single dataset or a mapping (collection) of datasets is given. In the latter case, all dataset-specific endpoint urls have the prefix ``/datasets/{dataset_id}``, where ``{dataset_id}`` corresponds to the keys of the mapping (converted to strings). Still in the latter case, the endpoint ``/datasets`` is added and returns the list of all dataset ids. """
[docs] def __init__(self, datasets, routers=None, cache_kws=None, app_kws=None): self._datasets = normalize_datasets(datasets) if not self._datasets: # publish single dataset self._get_dataset_func = _dataset_unique_getter(datasets) dataset_route_prefix = '' else: self._get_dataset_func = _dataset_from_collection_getter(self._datasets) dataset_route_prefix = '/datasets/{dataset_id}' self._app_routers = _set_app_routers(routers, dataset_route_prefix) self._app = None self._app_kws = {} if app_kws is not None: self._app_kws.update(app_kws) self._cache = None self._cache_kws = {'available_bytes': 1e6} if cache_kws is not None: self._cache_kws.update(cache_kws)
@property def cache(self) -> cachey.Cache: """Returns the :class:`cachey.Cache` instance used by the FastAPI application.""" if self._cache is None: self._cache = cachey.Cache(**self._cache_kws) return self._cache def _init_app(self): """Initiate the FastAPI application.""" self._app = FastAPI(**self._app_kws) for rt, kwargs in self._app_routers: self._app.include_router(rt, **kwargs) self._app.dependency_overrides[get_dataset_ids] = lambda: list(self._datasets) self._app.dependency_overrides[get_dataset] = self._get_dataset_func self._app.dependency_overrides[get_cache] = lambda: self.cache if not self._datasets: # fix openapi spec for single dataset self._app.openapi = SingleDatasetOpenAPIOverrider(self._app).openapi return self._app @property def app(self) -> FastAPI: """Returns the :class:`fastapi.FastAPI` application instance.""" if self._app is None: self._app = self._init_app() return self._app
[docs] def serve(self, host='0.0.0.0', port=9000, log_level='debug', **kwargs): """Serve this FastAPI application via :func:`uvicorn.run`. Parameters ---------- host : str Bind socket to this host. port : int Bind socket to this port. log_level : str App logging level, valid options are {'critical', 'error', 'warning', 'info', 'debug', 'trace'}. **kwargs : Additional arguments to be passed to :func:`uvicorn.run`. Notes ----- This method is blocking and does not return. """ uvicorn.run(self.app, host=host, port=port, log_level=log_level, **kwargs)
@xr.register_dataset_accessor('rest') class RestAccessor: """REST API Accessor for serving one dataset in its dedicated FastAPI application. """ def __init__(self, xarray_obj): self._obj = xarray_obj self._rest = None self._initialized = False def _get_rest_obj(self): if self._rest is None: self._rest = Rest(self._obj) return self._rest def __call__(self, **kwargs): """Initialize this accessor by setting optional configuration values. Parameters ---------- **kwargs Arguments passed to :func:`xpublish.Rest.__init__`. Notes ----- This method can only be invoked once. """ if self._initialized: raise RuntimeError('This accessor has already been initialized') self._initialized = True self._rest = Rest(self._obj, **kwargs) return self @property def cache(self) -> cachey.Cache: """Returns the :class:`cachey.Cache` instance used by the FastAPI application.""" return self._get_rest_obj().cache @property def app(self) -> FastAPI: """Returns the :class:`fastapi.FastAPI` application instance.""" return self._get_rest_obj().app def serve(self, **kwargs): """Serve this FastAPI application via :func:`uvicorn.run`. Parameters ---------- **kwargs : Arguments passed to :func:`xpublish.Rest.serve`. Notes ----- This method is blocking and does not return. """ self._get_rest_obj().serve(**kwargs)