Source code for datacatalog.application

import importlib
import urllib.parse
import logging

from aiohttp import web
import aiohttp_cors
import aiopluggy

from datacatalog import startup_actions
from . import authorization, config, handlers, jwks, openapi, plugin_interfaces

logger = logging.getLogger(__name__)


[docs]class Application(web.Application): # language=rst """ The Application. .. todo:: Inheritance from ``web.Application`` is discouraged by aiohttp. This class, with its initializer, must be replaced by an application factory method that builds an application object from ``web.Application``. """ def __init__(self, *args, middlewares=None, **kwargs): middlewares = [] if middlewares is None else list(middlewares) # add required middlewares middlewares.extend([ web.normalize_path_middleware(), # todo: needed? # Make comment from the following line to disable authentication for local testing authorization.middleware ]) super().__init__(*args, middlewares=middlewares, **kwargs) # Initialize config: self._config = config.load() self._pool = None # set app properties path = urllib.parse.urlparse(self._config['web']['baseurl']).path if len(path) == 0 or path[-1] != '/': path += '/' self['path'] = path self['openapi'] = openapi.openapi self['jwks'] = jwks.load(self._config['jwks']) # set routes self.router.add_get(path + 'datasets', handlers.datasets.get_collection) self.router.add_post(path + 'datasets', handlers.datasets.post_collection) self.router.add_get(path + 'datasets/{dataset}', handlers.datasets.get) self.router.add_put(path + 'datasets/{dataset}', handlers.datasets.put) self.router.add_delete(path + 'datasets/{dataset}', handlers.datasets.delete) self.router.add_get(path + 'datasets/{dataset}/purls/{distribution}', handlers.datasets.link_redirect) self.router.add_get(path + 'openapi', handlers.openapi.get) self.router.add_get(path + 'system/health', handlers.systemhealth.get) # Load and initialize plugins: self._pm = aiopluggy.PluginManager('datacatalog') self._pm.register_specs(plugin_interfaces) self.on_startup.append(_on_startup) self.on_cleanup.append(_on_cleanup) self._load_plugins() self._initialize_sync() # CORS # this must be done after initialize_sync: plugins may register new # routes during setup and our allow_cors applies to all routes. if 'allow_cors' in self._config['web'] and self._config['web']['allow_cors']: cors = aiohttp_cors.setup(self, defaults={ '*': aiohttp_cors.ResourceOptions( expose_headers="*", allow_headers="*" ), }) for route in list(self.router.routes()): cors.add(route)
[docs] def _initialize_sync(self): results = self.hooks.initialize_sync(app=self) for r in results: if r.exception is not None: raise r.exception
@property def config(self) -> config.ConfigDict: return self._config @property def pool(self): return self._pool @property def pm(self) -> aiopluggy.PluginManager: return self._pm @property def hooks(self): return self._pm.hooks
[docs] def _load_plugins(self): for fq_name in self.config['plugins']: plugin = _resolve_plugin_path(fq_name) self.pm.register(plugin) missing = self.pm.missing() if len(missing) > 0: raise Exception( "There are no implementations for the following required hooks: %s" % missing )
[docs]async def _on_startup(app): results = await app.hooks.initialize(app=app) for r in results: if r.exception is not None: raise r.exception await startup_actions.run_startup_actions(app)
[docs]async def _on_cleanup(app): await app.hooks.deinitialize(app=app)
[docs]def _resolve_plugin_path(fq_name: str): """ Resolve the path to a plugin (module, class, or instance). :param str fq_name: The fully qualified name of a module, class or instance. :raises ModuleNotFoundError: if the module could not be found :raises AttributeError: if the plugin could not be found in the module """ segments = fq_name.split('.') nseg = len(segments) mod = None while nseg > 0: module_name = '.'.join(segments[:nseg]) try: mod = importlib.import_module(module_name) break except ModuleNotFoundError: pass nseg = nseg - 1 if mod is None: raise ModuleNotFoundError(fq_name) result = mod for segment in segments[nseg:]: result = getattr(result, segment) return result