Source code for xpublish.rest

from typing import (
    Dict,
    List,
    Literal,
    Optional,
    Tuple,
    Union,
)

import cachey
import pluggy
import uvicorn
import xarray as xr
from fastapi import (
    APIRouter,
    FastAPI,
    HTTPException,
    Path,
)

from .dependencies import (
    get_cache,
    get_dataset,
    get_dataset_ids,
    get_plugin_manager,
)
from .plugins import (
    Dependencies,
    Plugin,
    PluginSpec,
    get_plugins,
    load_default_plugins,
)
from .routers import dataset_collection_router
from .utils.api import (
    DATASET_ID_ATTR_KEY,
    SingleDatasetOpenAPIOverrider,
    check_route_conflicts,
    normalize_app_routers,
    normalize_datasets,
)

RouterKwargs = Dict
RouterAndKwargs = Tuple[APIRouter, RouterKwargs]
LogLevels = Literal['critical', 'error', 'warning', 'info', 'debug', 'trace']


[docs] class Rest: """Used to publish multiple Xarray Datasets via a REST API (FastAPI application). To publish a single dataset via its own FastAPI application, you might want to use the :attr:`xarray.Dataset.rest` accessor for more convenience. Additionally the :class:`xpublish.SingleDatasetRest` class allows has a simplified interface for single dataset access. NOTE: The urls of the application's API endpoints differ whether a single dataset or a mapping (collection) of datasets is given. In the latter case, all dataset-specific endpoint urls have the prefix ``/datasets/{dataset_id}``, where ``{dataset_id}`` corresponds to the keys of the mapping (converted to strings). Still in the latter case, the endpoint ``/datasets`` is added and returns the list of all dataset ids. """
[docs] def __init__( self, datasets: Optional[Dict[str, xr.Dataset]] = None, routers: Optional[List[APIRouter]] = None, cache_kws: Optional[Dict] = None, app_kws: Optional[Dict] = None, plugins: Optional[Dict[str, Plugin]] = None, ): """Initialize a REST object for publishing Xarray Datasets. Args: datasets: A mapping of datasets objects to be served. If a mapping is given, keys are used as dataset ids and are converted to strings. See also the notes below. routers: A list of dataset-specific :class:`fastapi.APIRouter` instances to include in the fastAPI application. These routers are in addition to any loaded via plugins. The items of the list may also be tuples with the following format: ``[(router1, {'prefix': '/foo', 'tags': ['foo', 'bar']})]``, where the 1st tuple element is a :class:`fastapi.APIRouter` instance and the 2nd element is a dictionary that is used to pass keyword arguments to :meth:`fastapi.FastAPI.include_router`. cache_kws: Dictionary of keyword arguments to be passed to :meth:`cachey.Cache.__init__()`. By default, the cache size is set to 1MB, but this can be changed with ``available_bytes``. app_kws: Dictionary of keyword arguments to be passed to :meth:`fastapi.FastAPI.__init__()`. plugins: Optional dictionary of loaded, configured plugins. Overrides automatic loading of plugins. If no plugins are desired, set to an empty dict. """ if isinstance(datasets, xr.Dataset): raise TypeError( 'xpublish.Rest no longer directly handles single datasets. ' 'Please use xpublish.SingleDatasetRest instead' ) self.setup_datasets(datasets or {}) self.setup_plugins(plugins) normalized_routers: List[Tuple[APIRouter, Dict]] = normalize_app_routers( routers or [], self._dataset_route_prefix, ) check_route_conflicts(normalized_routers) self._routers = normalized_routers self.init_app_kwargs(app_kws) self.init_cache_kwargs(cache_kws)
[docs] def setup_datasets(self, datasets: Dict[str, xr.Dataset]) -> str: """Initialize datasets and dataset accessor function. Args: datasets: Dictionary of datasets to serve with their names as keys. Returns: Prefix for dataset routers """ self._datasets = normalize_datasets(datasets) self._get_dataset_func = self.get_dataset_from_plugins self._dataset_route_prefix = '/datasets/{dataset_id}' return self._dataset_route_prefix
[docs] def get_datasets_from_plugins(self) -> List[str]: """Return dataset ids from directly loaded datasets and plugins. Used as a FastAPI dependency in dataset router plugins via :meth:`Rest.dependencies`. Returns: Dataset IDs from plugins and datasets loaded into :class:`xpublish.Rest` at initialization. """ dataset_ids = list(self._datasets) for plugin_dataset_ids in self.pm.hook.get_datasets(): dataset_ids.extend(plugin_dataset_ids) return dataset_ids
[docs] def get_dataset_from_plugins( self, dataset_id: str = Path(description='Unique ID of dataset'), ) -> xr.Dataset: """Attempts to load dataset from plugins. Otherwise return dataset from passed in dictionary of datasets. Args: dataset_id: Unique key of dataset to attempt to load from plugins or those provided to :class:`xpublish.Rest` at initialization. Returns: Dataset for selected ``dataset_id``. Raises: FastAPI.HTTPException: When a dataset is not found a 404 error is returned. """ dataset = self.pm.hook.get_dataset(dataset_id=dataset_id) if dataset: if dataset.attrs.get(DATASET_ID_ATTR_KEY, None) is None: dataset.attrs[DATASET_ID_ATTR_KEY] = dataset_id return dataset if dataset_id not in self._datasets: raise HTTPException(status_code=404, detail=f"Dataset '{dataset_id}' not found") return self._datasets[dataset_id]
[docs] def setup_plugins( self, plugins: Optional[Dict[str, Plugin]] = None, ) -> None: """Initialize and load plugins from entry_points unless explicitly provided. Args: plugins: A dictionary of initialized plugins. If provided, then the automatic loading of plugins is disabled. Providing an empty dictionary will also disable automatic loading of plugins. """ if plugins is None: plugins = load_default_plugins() self.pm = pluggy.PluginManager('xpublish') self.pm.add_hookspecs(PluginSpec) for name, plugin in plugins.items(): self.pm.register(plugin, name=name) for hookspec in self.pm.hook.register_hookspec(): self.pm.add_hookspecs(hookspec)
[docs] def register_plugin( self, plugin: Plugin, plugin_name: Optional[str] = None, overwrite: bool = False, ) -> None: """Register a plugin with the xpublish system. Args: plugin: Instantiated Plugin object. plugin_name: Plugin name. overwrite: If a plugin of the same name exist, setting this to True will remove the existing plugin before registering the new plugin. Defaults to False. Raises: AttributeError: Plugin can not be registered. ValueError: Plugin already registered, try setting overwrite to True. """ try: plugin_name = plugin_name or plugin.name if overwrite is True and plugin_name in dict(self.pm.list_name_plugin()): # If a plugin exist with the same name, unregister it. # If configured using entry_points, the name of the # entry_point should be the same as the plugin.name. self.pm.unregister(name=plugin_name) # Get existing plugins again existing_plugins = self.pm.get_plugins() self.pm.register(plugin, plugin_name) except AttributeError as e: raise AttributeError( f'Plugin {plugin} is likely not initialized before registration' ) from e for hookspec in self.pm.subset_hook_caller( 'register_hookspec', remove_plugins=existing_plugins )(): self.pm.add_hookspecs(hookspec)
[docs] def init_cache_kwargs(self, cache_kws: Union[dict, None]) -> None: """Set up cache kwargs. Args: cache_kws: Dictionary of cache keyword arguments. """ self._cache = None self._cache_kws = {'available_bytes': 1e6} if cache_kws is not None: self._cache_kws.update(cache_kws)
[docs] def init_app_kwargs(self, app_kws: Union[dict, None]) -> None: """Set up FastAPI application kwargs. Args: app_kws: Dictionary of FastAPI application keyword arguments. """ self._app = None self._app_kws = {} if app_kws is not None: self._app_kws.update(app_kws)
@property def cache(self) -> cachey.Cache: """Returns the :class:`cachey.Cache` instance used by the FastAPI application.""" if self._cache is None: self._cache = cachey.Cache(**self._cache_kws) return self._cache @property def plugins(self) -> Dict[str, Plugin]: """Returns the loaded plugins.""" return dict(self.pm.list_name_plugin()) def _init_routers(self, dataset_routers: Optional[APIRouter]) -> None: """Setup plugin and dataset routers. Needs to run after dataset and plugin setup.""" app_routers, plugin_dataset_routers = self.plugin_routers() if self._dataset_route_prefix: app_routers.append((dataset_collection_router, {'tags': ['info']})) app_routers.extend( normalize_app_routers( plugin_dataset_routers + (dataset_routers or []), self._dataset_route_prefix ) ) check_route_conflicts(app_routers) self._app_routers = app_routers
[docs] def plugin_routers(self) -> Tuple[List[RouterAndKwargs], List[RouterAndKwargs]]: """Load the app and dataset routers for plugins. Returns: A tuple containing a list of top-level routers from plugins and a list of per-dataset routers from plugins """ app_routers = [] dataset_routers = [] deps = self.dependencies() for router in self.pm.hook.app_router(deps=deps): app_routers.append((router, {})) for router in self.pm.hook.dataset_router(deps=deps): dataset_routers.append((router, {})) return app_routers, dataset_routers
[docs] def dependencies(self) -> Dependencies: """FastAPI dependencies to pass to plugin router methods. Returns: initialized :class:xpublish.plugins.Dependencies object. """ deps = Dependencies( dataset_ids=self.get_datasets_from_plugins, dataset=self._get_dataset_func, cache=lambda: self.cache, plugins=lambda: self.plugins, plugin_manager=lambda: self.pm, ) return deps
def _init_dependencies(self) -> None: """Initialize dependencies.""" deps = self.dependencies() self._app.dependency_overrides[get_dataset_ids] = deps.dataset_ids self._app.dependency_overrides[get_dataset] = deps.dataset self._app.dependency_overrides[get_cache] = deps.cache self._app.dependency_overrides[get_plugins] = deps.plugins self._app.dependency_overrides[get_plugin_manager] = deps.plugin_manager def _init_app(self) -> FastAPI: """Initiate the FastAPI application. Returns: FastAPI application instance. """ self._app = FastAPI(**self._app_kws) self._init_routers(self._routers) for rt, kwargs in self._app_routers: self._app.include_router(rt, **kwargs) self._init_dependencies() return self._app @property def app(self) -> FastAPI: """Returns the :class:`fastapi.FastAPI` application instance. NOTE: Plugins registered with :meth:`xpublish.Rest.register_plugin` after :meth:`xpublish.Rest.app` is accessed or :meth:`xpublish.Rest.serve` is called once may not take effect. """ if self._app is None: self._app = self._init_app() return self._app
[docs] def serve( self, host: Optional[str] = '0.0.0.0', port: Optional[int] = 9000, log_level: Optional[LogLevels] = 'debug', **kwargs, ) -> None: """Serve this FastAPI application via :func:`uvicorn.run`. NOTE: This method is blocking and does not return. Args: host: Bind socket to this host. port: Bind socket to this port. log_level: App logging level, valid options are {'critical', 'error', 'warning', 'info', 'debug', 'trace'}. **kwargs: Additional arguments to be passed to :func:`uvicorn.run`. """ uvicorn.run( self.app, host=host, port=port, log_level=log_level, **kwargs, )
[docs] class SingleDatasetRest(Rest): """Used to publish a single Xarray dataset via a REST API (FastAPI application). Use :class:`xpublish.Rest` to publish multiple datasets. """
[docs] def __init__( self, dataset: xr.Dataset, routers: Optional[List[APIRouter]] = None, cache_kws: Optional[Dict] = None, app_kws: Optional[Dict] = None, plugins: Optional[Dict[str, Plugin]] = None, ): """Initialize the SingleDatasetRest object. Args: dataset: A single :class:`xarray.Dataset` object to be served. routers: A list of dataset-specific :class:`fastapi.APIRouter` instances to include in the fastAPI application. These routers are in addition to any loaded via plugins. The items of the list may also be tuples with the following format: ``[(router1, {'prefix': '/foo', 'tags': ['foo', 'bar']})]``, where the 1st tuple element is a :class:`fastapi.APIRouter` instance and the 2nd element is a dictionary that is used to pass keyword arguments to :meth:`fastapi.FastAPI.include_router`. cache_kws: Dictionary of keyword arguments to be passed to :meth:`cachey.Cache.__init__()`. By default, the cache size is set to 1MB, but this can be changed with ``available_bytes``. app_kws: Dictionary of keyword arguments to be passed to :meth:`fastapi.FastAPI.__init__()`. plugins: Optional dictionary of loaded, configured plugins. Overrides automatic loading of plugins. If no plugins are desired, set to an empty dict. """ self._dataset = dataset super().__init__({}, routers, cache_kws, app_kws, plugins)
[docs] def setup_datasets(self, datasets) -> str: """Modifies dataset loading to instead connect to the single dataset.""" self._dataset_route_prefix = '' self._datasets = {} self._get_dataset_func = lambda: self._dataset return self._dataset_route_prefix
def _init_app(self) -> FastAPI: self._app = super()._init_app() self._app.openapi = SingleDatasetOpenAPIOverrider(self._app).openapi return self._app