Source code for owmeta.data_trans.data_with_evidence_ds
from rdflib.namespace import Namespace
from owmeta_core.context import Context
from owmeta_core.datasource import Informational, DataSource
from owmeta_core.data_trans.context_datasource import VariableIdentifierContext
from .. import CONTEXT, SCI_CTX
from .common_data import DSMixin
[docs]class DataWithEvidenceDataSource(DSMixin, DataSource):
'''
A data source that has an "evidence context" containing statements which support those
in its "data context". The data source also has a combined context which imports both
the data and evidence contexts.
'''
class_context = SCI_CTX
evidence_context_property = Informational(display_name='Evidence context',
property_name='evidence_context',
property_type='ObjectProperty',
multiple=False,
description='The context in which evidence'
' for the "Data context" is defined')
data_context_property = Informational(display_name='Data context',
property_name='data_context',
property_type='ObjectProperty',
multiple=False,
description='The context in which primary data'
' for this data source is defined')
combined_context_property = Informational(display_name='Combined context',
property_name='combined_context',
property_type='ObjectProperty',
multiple=False,
description='Context importing both the data and evidence contexts')
def __init__(self, *args, **kwargs):
super(DataWithEvidenceDataSource, self).__init__(*args, **kwargs)
self.__ad_hoc_contexts = dict()
self.data_context = _DataContext.contextualize(self.context)(maker=self,
imported=(CONTEXT,))
self.evidence_context = _EvidenceContext.contextualize(self.context)(maker=self,
imported=(CONTEXT,))
self.combined_context = _CombinedContext.contextualize(self.context)(maker=self,
imported=(self.data_context,
self.evidence_context))
if not type(self).query_mode:
if not self.data_context_property.has_defined_value():
self.data_context_property(self.data_context.rdf_object)
if not self.evidence_context_property.has_defined_value():
self.evidence_context_property(self.evidence_context.rdf_object)
if not self.combined_context_property.has_defined_value():
self.combined_context_property(self.combined_context.rdf_object)
def data_context_for(self, **kwargs):
ctx = self.context_for(**kwargs)
self.data_context.add_import(ctx)
return ctx
def context_for(self, ident=None, **kwargs):
key = "&".join(k + "=" + getattr(kwargs[k], 'identifier', str(kwargs[k]))
for k in sorted(kwargs.keys()))
res = self.__ad_hoc_contexts.get(key)
if res is None:
if ident:
ctxid = ident
else:
ctxid = self.identifier + '/context_for?' + key
self.__ad_hoc_contexts[key] = Context.contextualize(self.context)(ident=ctxid)
res = self.__ad_hoc_contexts[key]
return res
def commit_augment(self):
for ctx in self.__ad_hoc_contexts.values():
ctx.save()
self.evidence_context.save()
self.data_context.save()
self.combined_context.save()
for ctx in self.__ad_hoc_contexts.values():
ctx.save_imports(transitive=False)
self.evidence_context.save_imports(transitive=False)
self.data_context.save_imports(transitive=False)
self.combined_context.save_imports(transitive=False)
class _CombinedContext(VariableIdentifierContext):
def identifier_helper(self):
if self.maker.defined:
return self.maker.identifier
else:
return None
class _EvidenceContext(VariableIdentifierContext):
def identifier_helper(self):
if self.maker.defined:
return self.maker.identifier + '-evidence'
else:
return None
class _DataContext(VariableIdentifierContext):
def identifier_helper(self):
if self.maker.defined:
return self.maker.identifier + '-data'
else:
return None