from typing import Dict, List, Optional, Tuple
import cachey
import pluggy
import uvicorn
import xarray as xr
from fastapi import APIRouter, FastAPI, HTTPException
from .dependencies import get_cache, get_dataset, get_dataset_ids, get_plugin_manager
from .plugins import Dependencies, Plugin, PluginSpec, get_plugins, load_default_plugins
from .routers import dataset_collection_router
from .utils.api import (
SingleDatasetOpenAPIOverrider,
check_route_conflicts,
normalize_app_routers,
normalize_datasets,
)
RouterKwargs = Dict
RouterAndKwargs = Tuple[APIRouter, RouterKwargs]
[docs]class Rest:
"""Used to publish multiple Xarray Datasets via a REST API (FastAPI application).
To publish a single dataset via its own FastAPI application, you might
want to use the :attr:`xarray.Dataset.rest` accessor for more convenience.
Additionally the :class:`xpublish.SingleDatasetRest` class allows has
a simplified interface for single dataset access.
Parameters
----------
datasets :
A mapping of datasets objects to be served. If a mapping is given, keys
are used as dataset ids and are converted to strings. See also the notes below.
routers :
A list of dataset-specific :class:`fastapi.APIRouter` instances to
include in the fastAPI application. These routers are in addition
to any loaded via plugins.
The items of the list may also be tuples with the following format:
``[(router1, {'prefix': '/foo', 'tags': ['foo', 'bar']})]``, where
the 1st tuple element is a :class:`fastapi.APIRouter` instance and the
2nd element is a dictionary that is used to pass keyword arguments to
:meth:`fastapi.FastAPI.include_router`.
cache_kws :
Dictionary of keyword arguments to be passed to
:meth:`cachey.Cache.__init__()`.
By default, the cache size is set to 1MB, but this can be changed with
``available_bytes``.
app_kws :
Dictionary of keyword arguments to be passed to
:meth:`fastapi.FastAPI.__init__()`.
plugins :
Optional dictionary of loaded, configured plugins.
Overrides automatic loading of plugins.
If no plugins are desired, set to an empty dict.
Notes
-----
The urls of the application's API endpoints differ whether a single dataset
or a mapping (collection) of datasets is given. In the latter case, all
dataset-specific endpoint urls have the prefix ``/datasets/{dataset_id}``,
where ``{dataset_id}`` corresponds to the keys of the mapping (converted to
strings). Still in the latter case, the endpoint ``/datasets`` is added and returns
the list of all dataset ids.
"""
[docs] def __init__(
self,
datasets: Optional[Dict[str, xr.Dataset]] = None,
routers: Optional[APIRouter] = None,
cache_kws: Optional[Dict] = None,
app_kws: Optional[Dict] = None,
plugins: Optional[Dict[str, Plugin]] = None,
):
if isinstance(datasets, xr.Dataset):
raise TypeError(
'xpublish.Rest no longer directly handles single datasets. Please use xpublish.SingleDatasetRest instead'
)
self.setup_datasets(datasets or {})
self.setup_plugins(plugins)
routers = normalize_app_routers(routers or [], self._dataset_route_prefix)
check_route_conflicts(routers)
self._routers = routers
self.init_app_kwargs(app_kws)
self.init_cache_kwargs(cache_kws)
[docs] def setup_datasets(self, datasets: Dict[str, xr.Dataset]) -> str:
"""Initialize datasets and dataset accessor function
Returns:
Prefix for dataset routers
"""
self._datasets = normalize_datasets(datasets)
self._get_dataset_func = self.get_dataset_from_plugins
self._dataset_route_prefix = '/datasets/{dataset_id}'
return self._dataset_route_prefix
[docs] def get_datasets_from_plugins(self) -> List[str]:
"""Return dataset ids from directly loaded datasets and plugins
Used as a FastAPI dependency in dataset router plugins
via :meth:`Rest.dependencies`.
Returns:
Dataset IDs from plugins and datasets loaded into
:class:`xpublish.Rest` at initialization.
"""
dataset_ids = list(self._datasets)
for plugin_dataset_ids in self.pm.hook.get_datasets():
dataset_ids.extend(plugin_dataset_ids)
return dataset_ids
[docs] def get_dataset_from_plugins(self, dataset_id: str) -> xr.Dataset:
"""Attempt to load dataset from plugins, otherwise return dataset from passed in dictionary of datasets
Parameters:
dataset_id:
Unique key of dataset to attempt to load from plugins or
those provided to :class:`xpublish.Rest` at initialization.
Returns:
Dataset for selected ``dataset_id``
Raises:
FastAPI.HTTPException: When a dataset is not found a 404 error is returned.
"""
dataset = self.pm.hook.get_dataset(dataset_id=dataset_id)
if dataset:
return dataset
if dataset_id not in self._datasets:
raise HTTPException(status_code=404, detail=f"Dataset '{dataset_id}' not found")
return self._datasets[dataset_id]
[docs] def setup_plugins(self, plugins: Optional[Dict[str, Plugin]] = None):
"""Initialize and load plugins from entry_points unless explicitly provided
Parameters:
plugins:
If a dictionary of initialized plugins is provided,
then the automatic loading of plugins is disabled.
Providing an empty dictionary will also disable
automatic loading of plugins.
"""
if plugins is None:
plugins = load_default_plugins()
self.pm = pluggy.PluginManager('xpublish')
self.pm.add_hookspecs(PluginSpec)
for name, plugin in plugins.items():
self.pm.register(plugin, name=name)
for hookspec in self.pm.hook.register_hookspec():
self.pm.add_hookspecs(hookspec)
[docs] def register_plugin(
self, plugin: Plugin, plugin_name: Optional[str] = None, overwrite: bool = False
):
"""
Register a plugin with the xpublish system
Args:
plugin: Instantiated Plugin object
plugin_name: Plugin name
overwrite: If a plugin of the same name exist,
setting this to True will remove the existing plugin before
registering the new plugin. Defaults to False.
Raises:
AttributeError: Plugin can not be registered
ValueError: Plugin already registered, try setting overwrite to True
"""
plugin_name = plugin_name or plugin.name
if overwrite is True and plugin_name in dict(self.pm.list_name_plugin()):
# If a plugin exist with the same name, unregister it.
# If configured using entry_points, the name of the
# entry_point should be the same as the plugin.name.
self.pm.unregister(name=plugin_name)
# Get existing plugins again
existing_plugins = self.pm.get_plugins()
try:
self.pm.register(plugin, plugin_name)
except AttributeError as e:
raise AttributeError(
f'Plugin {plugin} is likely not initialized before registration'
) from e
for hookspec in self.pm.subset_hook_caller(
'register_hookspec', remove_plugins=existing_plugins
)():
self.pm.add_hookspecs(hookspec)
[docs] def init_cache_kwargs(self, cache_kws):
"""Set up cache kwargs"""
self._cache = None
self._cache_kws = {'available_bytes': 1e6}
if cache_kws is not None:
self._cache_kws.update(cache_kws)
[docs] def init_app_kwargs(self, app_kws):
"""Set up FastAPI application kwargs"""
self._app = None
self._app_kws = {}
if app_kws is not None:
self._app_kws.update(app_kws)
@property
def cache(self) -> cachey.Cache:
"""Returns the :class:`cachey.Cache` instance used by the FastAPI application."""
if self._cache is None:
self._cache = cachey.Cache(**self._cache_kws)
return self._cache
@property
def plugins(self) -> Dict[str, Plugin]:
"""Returns the loaded plugins"""
return dict(self.pm.list_name_plugin())
def _init_routers(self, dataset_routers: Optional[APIRouter]):
"""Setup plugin and dataset routers. Needs to run after dataset and plugin setup"""
app_routers, plugin_dataset_routers = self.plugin_routers()
if self._dataset_route_prefix:
app_routers.append((dataset_collection_router, {'tags': ['info']}))
app_routers.extend(
normalize_app_routers(
plugin_dataset_routers + (dataset_routers or []), self._dataset_route_prefix
)
)
check_route_conflicts(app_routers)
self._app_routers = app_routers
[docs] def plugin_routers(self) -> Tuple[List[RouterAndKwargs], List[RouterAndKwargs]]:
"""Load the app and dataset routers for plugins
Returns:
A tuple containing a list of top-level routers from plugins
and a list of per-dataset routers from plugins
"""
app_routers = []
dataset_routers = []
deps = self.dependencies()
for router in self.pm.hook.app_router(deps=deps):
app_routers.append((router, {}))
for router in self.pm.hook.dataset_router(deps=deps):
dataset_routers.append((router, {}))
return app_routers, dataset_routers
[docs] def dependencies(self) -> Dependencies:
"""FastAPI dependencies to pass to plugin router methods"""
deps = Dependencies(
dataset_ids=self.get_datasets_from_plugins,
dataset=self._get_dataset_func,
cache=lambda: self.cache,
plugins=lambda: self.plugins,
plugin_manager=lambda: self.pm,
)
return deps
def _init_dependencies(self):
"""Initialize dependencies"""
deps = self.dependencies()
self._app.dependency_overrides[get_dataset_ids] = deps.dataset_ids
self._app.dependency_overrides[get_dataset] = deps.dataset
self._app.dependency_overrides[get_cache] = deps.cache
self._app.dependency_overrides[get_plugins] = deps.plugins
self._app.dependency_overrides[get_plugin_manager] = deps.plugin_manager
def _init_app(self):
"""Initiate the FastAPI application."""
self._app = FastAPI(**self._app_kws)
self._init_routers(self._routers)
for rt, kwargs in self._app_routers:
self._app.include_router(rt, **kwargs)
self._init_dependencies()
return self._app
@property
def app(self) -> FastAPI:
"""Returns the :class:`fastapi.FastAPI` application instance.
Notes
-----
Plugins registered with :meth:`xpublish.Rest.register_plugin` after :meth:`xpublish.Rest.app`
is accessed or :meth:`xpublish.Rest.serve` is called once may not take effect.
"""
if self._app is None:
self._app = self._init_app()
return self._app
[docs] def serve(self, host: str = '0.0.0.0', port: int = 9000, log_level: str = 'debug', **kwargs):
"""Serve this FastAPI application via :func:`uvicorn.run`.
Parameters
----------
host :
Bind socket to this host.
port :
Bind socket to this port.
log_level :
App logging level, valid options are
{'critical', 'error', 'warning', 'info', 'debug', 'trace'}.
**kwargs :
Additional arguments to be passed to :func:`uvicorn.run`.
Notes
-----
This method is blocking and does not return.
"""
uvicorn.run(self.app, host=host, port=port, log_level=log_level, **kwargs)
[docs]class SingleDatasetRest(Rest):
"""Used to publish a single Xarray dataset via a REST API (FastAPI application).
Use :class:`xpublish.Rest` to publish multiple datasets.
Parameters:
-----------
dataset :
A single :class:`xarray.Dataset` object to be served.
"""
[docs] def __init__(
self,
dataset: xr.Dataset,
routers: Optional[APIRouter] = None,
cache_kws: Optional[Dict] = None,
app_kws: Optional[Dict] = None,
plugins: Optional[Dict[str, Plugin]] = None,
):
self._dataset = dataset
super().__init__({}, routers, cache_kws, app_kws, plugins)
[docs] def setup_datasets(self, datasets):
"""Modifies the dataset loading to instead connect to the
single dataset"""
self._dataset_route_prefix = ''
self._datasets = {}
self._get_dataset_func = lambda: self._dataset
return self._dataset_route_prefix
def _init_app(self):
self._app = super()._init_app()
self._app.openapi = SingleDatasetOpenAPIOverrider(self._app).openapi
return self._app