import csv
import json.decoder
# import os.path
import logging
import re
import typing as T
# import urllib.parse
from aiohttp import web
from pyld import jsonld
from aiohttp_extras import conditional
from aiohttp_extras.content_negotiation import produces_content_types
from datacatalog.dcat import Direction
from datacatalog.handlers.openapi import clear_open_api_cache
logger = logging.getLogger('datacatalog')
_DCAT_ID_KEY = '@id'
_FACET_QUERY_KEY = re.compile(
r'(?:/properties/[^/=~<>]+(?:/items)?)+'
)
_FACET_QUERY_VALUE = re.compile(
r'(in|eq|gt|lt|ge|le)=(.*)', flags=re.S
)
_COMMA_SEPARATED_SEGMENT = re.compile(
r'(?:\\.|[^,\\])+'
)
_ESCAPED_CHARACTER = re.compile(r'\\(.)')
[docs]def _add_blank_node_identifiers_to(distributions: T.Iterable[dict]) -> None:
counter = 0
for distribution in distributions:
counter += 1
distribution['@id'] = "_:d{}".format(counter)
[docs]def _add_persistent_links_to(prefix, distributions: T.Iterable[dict]) -> None:
for distribution in distributions:
accessURL = distribution.get('dcat:accessURL', None)
if accessURL is None or 'dc:identifier' not in distribution:
continue
distribution['ams:purl'] = prefix + distribution['dc:identifier']
[docs]@produces_content_types('application/ld+json', 'application/json')
async def get(request: web.Request):
docid = request.match_info['dataset']
etag_if_none_match = conditional.parse_if_header(
request, conditional.HEADER_IF_NONE_MATCH
)
if etag_if_none_match == '*':
raise web.HTTPBadRequest(
body='Endpoint does not support * in the If-None-Match header.'
)
# now we know the etag is either None or a set
try:
doc, etag = await request.app.hooks.storage_retrieve(
app=request.app, docid=docid, etags=etag_if_none_match
)
except KeyError:
raise web.HTTPNotFound()
if doc is None:
return web.Response(status=304, headers={'Etag': etag})
expanded_doc = jsonld.expand(doc)[0]
canonical_doc = await request.app.hooks.mds_canonicalize(data=expanded_doc, id=docid)
identifiers_added = await _add_distribution_identifiers(request.app, canonical_doc)
if identifiers_added > 0:
hooks = request.app.hooks
# Let the metadata plugin grab the full-text search representation
searchable_text = await hooks.mds_full_text_search_representation(
data=canonical_doc
)
new_etag = await hooks.storage_update(
app=request.app, docid=docid, doc=canonical_doc,
searchable_text=searchable_text, etags={etag},
iso_639_1_code="nl")
etag = new_etag
logger.debug(f'Added {identifiers_added} identifiers for {docid}')
_add_blank_node_identifiers_to(canonical_doc['dcat:distribution'])
_add_persistent_links_to(_datasets_url(request) + f'/{docid}/purls/', canonical_doc['dcat:distribution'])
return web.json_response(canonical_doc, headers={
'Etag': etag, 'content_type': 'application/ld+json'
})
[docs]async def put(request: web.Request):
hooks = request.app.hooks
# Grab the document from the request body and canonicalize it.
try:
doc = await request.json()
except json.decoder.JSONDecodeError:
raise web.HTTPBadRequest(text='invalid json')
canonical_doc = await hooks.mds_canonicalize(data=doc, direction=Direction.PUT)
# Make sure the docid in the path corresponds to the ids given in the
# document, if any. The assumption is that request.path (which corresponds
# to the path part of the incoming HTTP request) is the path as seen by
# the client. This is not necessarily true.
docid = request.match_info['dataset']
docurl = f"ams-dcatd:{docid}"
if '@id' in canonical_doc:
if canonical_doc['@id'] != docurl:
raise web.HTTPBadRequest(
text='Invalid @id: {} != {}'.format(
canonical_doc['@id'], docurl
)
)
del canonical_doc[_DCAT_ID_KEY]
if 'dct:identifier' in canonical_doc:
if canonical_doc['dct:identifier'] != docid:
raise web.HTTPBadRequest(
text='Invalid dct:identifier: {} != {}'.format(
canonical_doc['dct:identifier'], docid
)
)
del canonical_doc['dct:identifier']
# Let the metadata plugin grab the full-text search representation
searchable_text = await hooks.mds_full_text_search_representation(
data=canonical_doc
)
identifiers_added = await _add_distribution_identifiers(request.app, canonical_doc)
if identifiers_added > 0:
logger.debug(f'Added {identifiers_added} identifiers for {docid}')
# Figure out the mode (insert / update) of the request.
etag_if_match = conditional.parse_if_header(
request, conditional.HEADER_IF_MATCH
)
etag_if_none_match = conditional.parse_if_header(
request, conditional.HEADER_IF_NONE_MATCH
)
# Can't accept a value in both headers
if etag_if_match is not None and etag_if_none_match is not None:
raise web.HTTPBadRequest(
body='Endpoint supports either If-Match or If-None-Match in a '
'single request, not both'
)
# If-Match: {etag, ...} is for updates
if etag_if_match is not None:
if etag_if_match == '*':
raise web.HTTPBadRequest(
body='Endpoint does not support If-Match: *. Must provide one '
'or more Etags.'
)
try:
new_etag = await hooks.storage_update(
app=request.app, docid=docid, doc=canonical_doc,
searchable_text=searchable_text, etags=etag_if_match,
iso_639_1_code="nl"
)
except ValueError:
raise web.HTTPPreconditionFailed()
clear_open_api_cache()
return web.Response(status=204, headers={'Etag': new_etag})
# If-None-Match: * is for creates
if etag_if_none_match != '*':
raise web.HTTPBadRequest(
body='For inserts of new documents, provide If-None-Match: *'
)
try:
new_etag = await hooks.storage_create(
app=request.app, docid=docid, doc=canonical_doc,
searchable_text=searchable_text, iso_639_1_code="nl"
)
except KeyError:
raise web.HTTPPreconditionFailed()
clear_open_api_cache()
return web.Response(
status=201, headers={'Etag': new_etag}, content_type='text/plain'
)
[docs]async def delete(request: web.Request):
given_id = request.match_info['dataset']
etag_if_match = conditional.parse_if_header(request, conditional.HEADER_IF_MATCH)
if etag_if_match is None or etag_if_match == '*':
raise web.HTTPBadRequest(
text='Must provide a If-Match header containing one or more ETags.'
)
try:
await request.app.hooks.storage_delete(
app=request.app, docid=given_id, etags=etag_if_match)
except KeyError:
raise web.HTTPNotFound()
clear_open_api_cache()
return web.Response(status=204, content_type='text/plain')
[docs]def get_last_modified_date(doc: dict)->str:
"""
Sorteren op, in onderstaande volgorde. Als veld niet gevuld is, dan volgende datum gebruiken:
verversingsdatum resource: dct:modified
wijzigingsdatum resource: primarytopicof: dct:modified
publicatiedatum resource: primarytopicof: dct:issued
Als een dataset meerdere resources heeft dan de laatste datum nemen.
wijzigingsdatum dataset: dct:modified
publicatiedatum dataset: dct:issued
wijzigingsdatum dataset: primarytopicof: dct:modified
publicatiedatum dataset: primarytopicof: dct:issued
Args:
doc:dataset
Returns:last_modified date to use for sorting
"""
last_modified = ''
if 'dcat:distribution' in doc:
for resource in doc['dcat:distribution']:
if 'dct:modified' in resource and resource['dct:modified'] > last_modified:
last_modified = resource['dct:modified']
if not last_modified:
dct_issued_resource = ''
for resource in doc['dcat:distribution']:
if 'foaf:isPrimaryTopicOf' in resource:
primary = resource['foaf:isPrimaryTopicOf']
if 'dct:modified' in primary and primary['dct:modified'] > last_modified:
last_modified = primary['dct:modified']
if 'dct_issued' in primary and primary['dct:issued'] > dct_issued_resource:
dct_issued_resource = primary['dct:issued']
if not last_modified:
last_modified = dct_issued_resource
if not last_modified:
if 'dct:modified' in doc:
last_modified = doc['dct:modified']
elif 'dct:issued' in doc:
last_modified = doc['dct:issued']
elif 'foaf:isPrimaryTopicOf' in doc:
primary = doc['foaf:isPrimaryTopicOf']
if 'dct:modified' in primary:
last_modified = primary['dct:modified']
elif 'dct:issued' in primary:
last_modified = primary['dct:issued']
return last_modified
[docs]@produces_content_types('application/ld+json', 'application/json')
async def get_collection(request: web.Request) -> web.StreamResponse:
# language=rst
"""Handler for ``/datasets``"""
hooks = request.app.hooks
query = request.query
# Extract facet filters:
filters = {}
for key in query:
if not _FACET_QUERY_KEY.fullmatch(key):
continue
if key not in filters:
filters[key] = {}
match: T.List[str] = _FACET_QUERY_VALUE.fullmatch(query[key])
if not match:
raise web.HTTPBadRequest(
text="Unknown comparator in query parameter '%s'" % key
)
comparator = match[1]
value = match[2]
if comparator in filters[key]:
raise web.HTTPBadRequest(
text="Multiple facet filters for facet %s with comparator %s" %
(key, comparator)
)
if comparator == 'in':
value = _csv_decode_line(value)
if value is None:
raise web.HTTPBadRequest(
text="Value of query parameter '%s' is not a CSV encoded list of strings; see RFC4180" % key
)
filters[key][comparator] = value
full_text_query = query.get('q', '').strip()
limit = query.get('limit', None)
if limit is not None:
try:
limit = int(limit)
except ValueError:
raise web.HTTPBadRequest(
text="Invalid limit value %s" % limit
)
offset = query.get('offset', 0)
if offset is not None:
try:
offset = int(offset)
except ValueError:
raise web.HTTPBadRequest(
text="Invalid offset value %s" % offset
)
result_info = {}
resultiterator = await hooks.search_search(
app=request.app, q=full_text_query,
result_info=result_info,
sort_field_get=get_last_modified_date,
facets=[
'/properties/dcat:distribution/items/properties/ams:resourceType',
'/properties/dcat:distribution/items/properties/dct:format',
'/properties/dcat:distribution/items/properties/ams:distributionType',
'/properties/dcat:distribution/items/properties/ams:serviceType',
'/properties/dcat:keyword/items',
'/properties/dcat:theme/items',
'/properties/ams:owner'
],
limit=limit, offset=offset,
filters=filters, iso_639_1_code='nl'
)
ctx = await hooks.mds_context()
ctx_json = json.dumps(ctx)
response = web.StreamResponse()
response.content_type = request['best_content_type']
response.enable_compression()
first = True
await response.prepare(request)
await response.write(b'{"@context":')
await response.write(ctx_json.encode())
await response.write(b',"dcat:dataset":[')
async for docid, doc in resultiterator:
canonical_doc = await hooks.mds_canonicalize(data=doc, id=docid)
keepers = {'@id', 'dct:identifier', 'dct:title', 'dct:description',
'dcat:keyword', 'foaf:isPrimaryTopicOf', 'dcat:distribution',
'dcat:theme', 'ams:owner'}
for key in list(canonical_doc.keys()):
if key not in keepers:
del canonical_doc[key]
keepers = {'dct:format', 'ams:resourceType', 'ams:distributionType',
'ams:serviceType', 'dc:identifier'}
for d in canonical_doc.get('dcat:distribution', []):
for key in list(d.keys()):
if key not in keepers:
del d[key]
if not first:
await response.write(b',')
else:
first = False
await response.write(json.dumps(canonical_doc).encode())
await response.write(b']')
await response.write(b', "void:documents": ')
await response.write(str(result_info['/']).encode())
del result_info['/']
await response.write(b', "ams:facet_info": ')
await response.write(json.dumps(result_info).encode())
await response.write(b'}')
await response.write_eof()
return response
[docs]async def link_redirect(request: web.Request):
dataset = request.match_info['dataset']
distribution = request.match_info['distribution']
etag_if_none_match = conditional.parse_if_header(
request, conditional.HEADER_IF_NONE_MATCH
)
if etag_if_none_match == '*':
raise web.HTTPBadRequest(
body='Endpoint does not support * in the If-None-Match header.'
)
try:
doc, etag = await request.app.hooks.storage_retrieve(
app=request.app, docid=dataset, etags=etag_if_none_match
)
except KeyError:
raise web.HTTPNotFound()
if doc is None:
raise web.HTTPNotModified(headers={'ETag': etag})
headers = {'ETag': etag}
resource_url = None
for dist in doc.get('dcat:distribution', []):
if dist.get('dc:identifier', None) == distribution:
resource_url = dist.get('dcat:accessURL', None)
break
if resource_url is None:
raise web.HTTPNotFound(headers=headers)
raise web.HTTPTemporaryRedirect(location=resource_url, headers=headers)
[docs]async def post_collection(request: web.Request):
hooks = request.app.hooks
datasets_url = _datasets_url(request)
# Grab the document from the request body and canonicalize it.
try:
doc = await request.json()
except json.decoder.JSONDecodeError:
raise web.HTTPBadRequest(text='invalid json')
canonical_doc = await hooks.mds_canonicalize(data=doc, direction=Direction.PUT)
docurl = canonical_doc.get('@id')
if docurl is not None:
del canonical_doc['@id']
docid = canonical_doc.get('dct:identifier')
if docid is not None:
del canonical_doc['dct:identifier']
if docid is not None:
collection_url_plus_docid = f"ams-dcatd:{docid}"
if docurl is None:
docurl = collection_url_plus_docid
elif docurl != collection_url_plus_docid:
raise web.HTTPBadRequest(
text='{} != {}'.format(docurl, collection_url_plus_docid)
)
elif docurl is not None:
if docurl.find(datasets_url + '/') != 0:
raise web.HTTPBadRequest(
text=f"{_DCAT_ID_KEY} must start with {datasets_url}/"
)
docid = docurl[len(datasets_url) + 1:]
if not re.fullmatch(r"(?:%[a-f0-9]{2}|[-\w:@!$&'()*+,;=.~])+", docid):
raise web.HTTPBadRequest(
text=f"Illegal value for {_DCAT_ID_KEY}"
)
else:
docid = await hooks.storage_id()
docurl = f"{datasets_url}/{docid}"
# Let the metadata plugin grab the full-text search representation
searchable_text = await hooks.mds_full_text_search_representation(
data=canonical_doc
)
identifiers_added = await _add_distribution_identifiers(request.app, canonical_doc)
if identifiers_added > 0:
logger.debug(f'Added {identifiers_added} identifiers for {docid}')
try:
new_etag = await hooks.storage_create(
app=request.app, docid=docid, doc=canonical_doc,
searchable_text=searchable_text, iso_639_1_code="nl"
)
except KeyError:
raise web.HTTPBadRequest(
text='Document with id {} already exists'.format(docid)
)
clear_open_api_cache()
return web.Response(
status=201, headers={'Etag': new_etag, 'Location': docurl},
content_type='text/plain'
)
[docs]def _datasets_url(request: web.Request) -> str:
return request.app.config['web']['baseurl'] + 'datasets'
[docs]def _csv_decode_line(s: str) -> T.Optional[T.Set[str]]:
reader = csv.reader([s])
try:
return set(next(iter(reader)))
except (csv.Error, StopIteration):
return None
[docs]def _split_comma_separated_stringset(s: str) -> T.Optional[T.Set[str]]:
pos = 0
len_s = len(s)
retval = set()
while pos < len_s:
if pos != 0 and s[pos] == ',':
pos = pos + 1
matches: T.List[str] = _COMMA_SEPARATED_SEGMENT.match(s, pos)
if matches is None:
return None
match = matches[0]
pos = pos + len(match)
retval.add(_ESCAPED_CHARACTER.sub(r'\1', match))
return retval
[docs]async def _add_distribution_identifiers(app, canonical_doc: dict) -> int:
all_identifiers = set()
to_be_added = 0
for distribution in canonical_doc.get('dcat:distribution', []):
if 'dc:identifier' in distribution:
all_identifiers.add(distribution['dc:identifier'])
else:
to_be_added += 1
if to_be_added == 0:
return 0
index = 0
for distribution in canonical_doc.get('dcat:distribution', []):
if 'dc:identifier' not in distribution:
selector_found = False
while not selector_found:
new_selector = await app.hooks.storage_id()
if new_selector not in all_identifiers:
distribution['dc:identifier'] = new_selector
selector_found = True
all_identifiers.add(new_selector)
index += 1
return to_be_added