Source code for datacatalog.plugins.swift

import pkg_resources
import yaml
import logging
import typing as T
from uuid import uuid4

from aiohttp import helpers, web
import aiohttp
import aiopluggy

_hookimpl = aiopluggy.HookimplMarker('datacatalog')
_logger = logging.getLogger(__name__)

_BASE_URL = None
_AUTHORIZATION = None


[docs]@_hookimpl def initialize_sync(app): # language=rst """ Initialize the plugin. - validates the configuration loaded by `app`; - initializes `_BASE_URL` and `_AUTHORIZATION`; - registers a new endpoint ``/files`` in the HTTP service. """ # validate configuration with pkg_resources.resource_stream(__name__, 'config_schema.yml') as s: schema = yaml.load(s) app.config.validate(schema) global _BASE_URL, _AUTHORIZATION config = app.config['storage_swift'] _BASE_URL = config['base_url'] + config['container'] + '/' _AUTHORIZATION = aiohttp.BasicAuth(config['user'], config['password']).encode() app.router.add_post(app['path'] + 'files', post)
[docs]async def _put_file_to_object_store(uuid: str, content_type: str, data, filename: T.Optional[str]=None): _logger.debug("url: %s\n", _BASE_URL + uuid) _logger.debug("Authorization: %s", _AUTHORIZATION) headers = { 'Authorization': _AUTHORIZATION, 'Content-Type': content_type } if filename is not None: headers['Content-Disposition'] = helpers.content_disposition_header( 'attachment', filename=filename ) async with aiohttp.ClientSession() as session: async with session.put( _BASE_URL + uuid, data=data, headers=headers, chunked=True, expect100=True ) as response: if response.status >= 400: _logger.error( "Couldn't store file in object store.\n" "%s %s:\n%s\n%r", response.status, response.reason, await response.text(), response ) raise web.HTTPBadGateway()
[docs]async def post(request: web.Request) -> web.Response: # language=rst """POST handler for ``/files``""" mimetype = helpers.parse_mimetype(request.headers[aiohttp.hdrs.CONTENT_TYPE]) if mimetype.type != 'multipart': raise web.HTTPBadRequest( text='multipart/* content type expected' ) reader = await request.multipart() field = await reader.next() if field.name != 'distribution': raise web.HTTPBadRequest( text='file field must be named “distribution”' ) uuid = uuid4().hex content_type = field.headers.get( aiohttp.hdrs.CONTENT_TYPE, 'application/octet-stream' ) await _put_file_to_object_store( uuid, content_type, field, filename=field.filename ) return web.Response( status=201, headers={ 'Location': _BASE_URL + uuid } )