Source code for datacatalog.authorization
import logging
import re
import typing as T
import urllib.parse
from aiohttp import web
import jwt
_logger = logging.getLogger(__name__)
[docs]async def _extract_scopes(request: web.Request) -> T.Set:
authorization_header = request.headers.get('authorization')
if authorization_header is None:
return set()
match = re.fullmatch(r'bearer ([-\w.=]+)', authorization_header, flags=re.IGNORECASE)
if not match:
return set()
token = match[1]
try:
header = jwt.get_unverified_header(token)
except (jwt.InvalidTokenError, jwt.DecodeError):
raise web.HTTPBadRequest(text='JWT decode error while reading header') from None
if 'kid' not in header:
raise web.HTTPBadRequest(text='Did not get a valid key identifier') from None
keys = request.app['jwks'].verifiers
if header['kid'] not in keys:
raise web.HTTPBadRequest(text="Unknown key identifier: {}".format(header['kid'])) from None
key = keys[header['kid']]
try:
access_token = jwt.decode(
token, verify=True,
key=key.key,
algorithms=key.alg
)
except jwt.InvalidTokenError:
raise web.HTTPBadRequest(text='Invalid Bearer token') from None
if 'scopes' not in access_token or not isinstance(access_token['scopes'], list):
raise web.HTTPBadRequest(
text='No scopes in access token'
)
return set(access_token['scopes'])
[docs]async def _extract_api_key_info(request: web.Request,
security_scheme: T.Dict) -> T.Any:
assert security_scheme['in'] == 'header'
assert security_scheme['name'] == 'Authorization'
authorization_header = request.headers.get('authorization')
if authorization_header is None:
return False
match = re.fullmatch(r'apikey ([-\w]+=*)', authorization_header)
if not match:
return False
return match[1] == request.app['config']['authz_admin']['api_key']
[docs]async def _extract_authz_info(request: web.Request,
security_definitions: T.Dict[str, T.Dict[str, T.Any]]):
result = {}
for key, security_scheme in security_definitions.items():
security_type = security_scheme['type']
if security_type == 'oauth2':
result[key] = await _extract_scopes(request)
elif security_type == 'apiKey':
result[key] = await _extract_api_key_info(request, security_scheme)
else:
_logger.error('Unknown security type: %s' % security_type)
raise web.HTTPInternalServerError()
return result
[docs]async def middleware(app, handler):
openapi = app['openapi']
baseurl = app.config['web']['baseurl']
base_path = urllib.parse.urlparse(baseurl).path
path_offset = len(base_path)
if path_offset > 0 and base_path[-1] == '/':
path_offset -= 1
paths = openapi['paths']
async def middleware_handler(request: web.Request) -> web.Response:
req_path = request.rel_url.raw_path[path_offset:]
method = request.method
path, pathspec = _get_path_spec(paths, req_path, method.lower())
if path is not None and 'security' in pathspec:
await _enforce_one_of(request, pathspec['security'])
return await handler(request)
return middleware_handler
[docs]async def _enforce_one_of(request: web.Request,
security_requirements: T.List[T.Dict[
str, T.Optional[T.Iterable]]
]):
for security_requirement in security_requirements:
if await _enforce_all_of(request, security_requirement):
return
raise web.HTTPUnauthorized()
[docs]async def _enforce_all_of(request: web.Request,
security_requirements: T.Dict[
str, T.Optional[T.Iterable]
]) -> bool:
openapi = request.app['openapi']
security_definitions = openapi['components']['securitySchemes']
all_authz_info = await _extract_authz_info(request, security_definitions)
for requirement, scopes in security_requirements.items():
authz_info = all_authz_info[requirement]
security_type = security_definitions[requirement]['type']
if security_type == 'oauth2':
if len(set(scopes) - authz_info) > 0:
return False
elif security_type == 'apiKey':
if not authz_info:
return False
else:
_logger.error('Unexpected security type: %s' % security_type)
raise web.HTTPInternalServerError()
return True
[docs]def _get_path_spec(paths: dict, path: str, method: str=None) -> T.Optional[T.Tuple[str, str]]:
"""Adapted from swagger-parser library."""
# Get the specification of the given path
path_spec = None
path_name = None
if path in paths:
path_spec = paths[path]
path_name = path
else:
for base_path in paths.keys():
regex_from_path = re.compile(re.sub('{[^/]*}', '([^/]*)', base_path))
if re.fullmatch(regex_from_path, path):
path_spec = paths[base_path]
path_name = base_path
# Test method if given
if path_spec is not None and method is not None and method in path_spec.keys():
path_spec = path_spec[method]
return path_name, path_spec