Skip to content

Commit

Permalink
adjust PROV for potential metadata updates - works for workflow, but …
Browse files Browse the repository at this point in the history
…not single CommandLineTool (relates to common-workflow-language/cwltool#2082)
  • Loading branch information
fmigneault committed Dec 7, 2024
1 parent 2a5a3ee commit 28af8a5
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 94 deletions.
2 changes: 1 addition & 1 deletion config/weaver.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ weaver.wps_metadata_identification_keywords=Weaver,WPS,OGC
# access constraints can be comma-separated
weaver.wps_metadata_identification_accessconstraints=NONE
weaver.wps_metadata_identification_fees=NONE
weaver.wps_metadata_provider_name=CRIM
weaver.wps_metadata_provider_name=Computer Research Institute of Montréal (CRIM)
weaver.wps_metadata_provider_url=http://pavics-weaver.readthedocs.org/en/latest/
weaver.wps_metadata_contact_name=Francis Charette-Migneault
weaver.wps_metadata_contact_position=Research Software Developer
Expand Down
6 changes: 4 additions & 2 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,8 @@ def links(self, container=None, self_link=None):
"title": "Job statistics collected following process execution."},
{"href": f"{job_url}/prov", "rel": "provenance", # unofficial
"title": "Job provenance collected following process execution."},
{"href": f"{job_url}/prov", "rel": "https://www.w3.org/ns/prov", # unofficial
"title": "Job provenance collected following process execution."},
])
else:
job_links.append({
Expand All @@ -1577,8 +1579,8 @@ def links(self, container=None, self_link=None):
job_links.extend([self_link_body, self_link_up])
link_meta = {"type": ContentType.APP_JSON, "hreflang": AcceptLanguage.EN_CA}
for link in job_links:
for meta, parma in link_meta.items():
link.setdefault(meta, parma)
for meta, param in link_meta.items():
link.setdefault(meta, param)
return job_links

def json(self, container=None): # pylint: disable=W0221,arguments-differ
Expand Down
255 changes: 164 additions & 91 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from cwltool.context import LoadingContext, RuntimeContext
from cwltool.cwlprov import provenance_constants as cwl_prov_const
from cwltool.cwlprov.ro import ResearchObject
from cwltool.cwlprov.writablebagfile import close_ro
from cwltool.cwlprov.writablebagfile import close_ro, packed_workflow
from cwltool.factory import Factory as CWLFactory, WorkflowStatus as CWLException
from cwltool.process import shortname, use_custom_schema
from cwltool.secrets import SecretStore
Expand Down Expand Up @@ -183,6 +183,7 @@
from cwltool.factory import Callable as CWLFactoryCallable
from cwltool.process import Process as ProcessCWL
from owslib.wps import WPSExecution
from prov.model import ProvDocument
from pywps.response.execute import ExecuteResponse

from weaver.datatype import Authentication, Job
Expand Down Expand Up @@ -1491,6 +1492,160 @@ def location(self, destination):
return self.storage.location(destination)


class WeaverResearchObject(ResearchObject):
"""
Defines extended provenance details related to `Weaver` operations and referencing the active server instance.
"""

def __init__(self, fs_access, temp_prefix_ro="tmp", orcid="", full_name="", ro_uuid=None, settings=None):
# type: (StdFsAccess, str, str, str, uuid.UUID, AnySettingsContainer) -> None
super(WeaverResearchObject, self).__init__(fs_access, temp_prefix_ro, orcid, full_name)

# rewrite auto-initialized random UUIDs with Weaver-specific references
self.ro_uuid = ro_uuid or uuid.uuid4()
self.base_uri = f"arcp://uuid,{self.ro_uuid}/"
self.settings = settings

# FIXME: improve "hook" strategy with CWLProv
# all the following is not called when the runner resolves to a 'cwltool.executors.SingleJobExecutor'
# (ie: when the CWL is a CommandLineTool). This is because its code sets the 'ProvenanceProfile' option
# with 'user_provenance=False' explicitly Sadly, this is our only availble "hook" mechanism for the time being.
# However, it works for a CWL Workflow since that variant passes down
# the 'user_provenance option that we set (ie: 'ProvenanceProfile' created in 'cwltool.workflow.Workflow').
# see https://github.com/common-workflow-language/cwltool/pull/2082 for alternative to resolve "properly"
def user_provenance(self, document):
# type: (ProvDocument) -> None
"""
Hook `Weaver` updates onto user provenance step.
Because of how the :class`ResearchObject` and :class:`cwltool.cwlprov.provenance_profile.ProvenanceProfile`
definitions are passed around, invoked, and finalized in the :mod:`cwltool` operations, attempting to update
the ``PROV`` definitions after execution of the tool would be too late, since all manifest and provenance files
would already have been written to disk. Instead, hook ourselves to this function that is invoked before the
procedure is completed to adjust or apply additional metadata.
"""
self.self_check()

# NOTE:
# the original operation does the following,
# but using the machine user means nothing in a docker running on a server
# (username, fullname) = _whoami()

weaver_url = get_weaver_url(self.settings)
weaver_desc = self.settings.get(
"weaver.wps_metadata_identification_abstract",
"Weaver OGC API Processes Server"
)
weaver_full_name = f"crim-ca/weaver:{weaver_version}"

# for whatever reason, this is done by a local 'host_provenance' function
# within the 'ProvenanceProfile.generate_prov_doc' method, and it applies
# by default the machine host FQDN, which is irrelevant inside a docker container
# instead, use actual information from the weaver server to populate the field
# note: because it is done here, option 'host_provenance' MUST be disabled to avoid duplicates
document.add_namespace(cwl_prov_const.UUID)
document.add_namespace(cwl_prov_const.ORCID)
document.add_namespace(cwl_prov_const.FOAF)
cwltool_agent = document.agent(
cwl_prov_const.ACCOUNT_UUID,
{
prov.constants.PROV_TYPE: cwl_prov_const.FOAF["OnlineAccount"],
prov.constants.PROV_LABEL: weaver_desc,
prov.constants.PROV_LOCATION: weaver_url,
cwl_prov_const.FOAF["accountName"]: weaver_full_name,
},
)

full_name = self.full_name or "undefined"
user_agent = document.agent(
self.orcid or cwl_prov_const.USER_UUID, # actual user if provided or alias for machine
{
prov.constants.PROV_TYPE: prov.constants.PROV["Person"],
prov.constants.PROV_LABEL: "User running the workflow job.",
cwl_prov_const.FOAF["name"]: full_name,
cwl_prov_const.FOAF["account"]: cwltool_agent,
},
)
# cwltool may be started on the shell (directly by user),
# by shell script (indirectly by user)
# or from a different program
# (which again is launched by any of the above)
#
# We can't tell in which way, but ultimately we're still
# acting in behalf of that user (even if we might
# get their name wrong!)
document.actedOnBehalfOf(cwltool_agent, user_agent)

document.add_namespace("doi", "https://doi.org/")
sha1_ns = document._namespaces.get_namespace("sha1")

crim_name = "Computer Research Institute of Montréal"
crim_entity = document.entity(
"_:crim",
{
prov.constants.PROV_TYPE: prov.constants.PROV["Organization"],
"foaf:name": crim_name,
"schema:name": crim_name,
}
)

server_provider_name = self.settings.get("weaver.wps_metadata_provider_name", weaver_url)
server_provider_entity = document.entity(
"_:server",
{
prov.constants.PROV_TYPE: prov.constants.PROV["Organization"],
"foaf:name": server_provider_name,
"schema:name": server_provider_name,
}
)

weaver_sha1 = hashlib.sha1(weaver_url)
weaver_agent = document.agent(
sha1_ns.qname(weaver_sha1),
{
prov.constants.PROV_TYPE: prov.constants.PROV["SoftwareAgent"],
prov.constants.PROV_LOCATION: weaver_url,
prov.constants.PROV_LABEL: weaver_full_name,
# "prov:qualifiedPrimarySource":
# "prov:Organization": "Computer Research Institute of Montréal (CRIM).",
# "foaf:Project": "https://github.com/crim-ca/weaver",
# "doi": "10.5281/zenodo.14210717" # see CITATION.cff
}
)

# cross-ref: https://wf4ever.github.io/ro/wfprov.owl
job_entity = document.entity(
self.job.uuid,
{
prov.constants.PROV_TYPE: cwl_prov_const.WFDESC["ProcessRun"],
prov.constants.PROV_LOCATION: self.job.job_url(self.settings),
prov.constants.PROV_LABEL: "Job Information",
}
)
proc_entity = document.entity(
self.job.uuid,
{
prov.constants.PROV_TYPE: cwl_prov_const.WFDESC["Process"],
prov.constants.PROV_LOCATION: self.job.process_url(self.settings),
prov.constants.PROV_LABEL: "Process Description",
}
)

wf_agent = document.get_record(self.engine_uuid) # current job run aligned with cwl workflow

# define relationships
document.actedOnBehalfOf(weaver_agent, user_agent)
document.specializationOf(weaver_agent, cwltool_agent)
document.attribution(crim_entity, weaver_agent)
document.wasDerivedFrom(cwltool_agent, weaver_agent)
document.derivation(server_provider_entity, weaver_agent)
document.wasStartedBy(job_entity, weaver_agent)
document.wasStartedBy(wf_agent, job_entity, time=self.job.created)
document.specializationOf(wf_agent, job_entity)
document.alternateOf(wf_agent, job_entity)
document.wasGeneratedBy(job_entity, proc_entity)


class WpsPackage(Process):
def __init__(
self,
Expand Down Expand Up @@ -1836,114 +1991,32 @@ def setup_provenance(self, loading_context, runtime_context):
return

loading_context.user_provenance = True
loading_context.host_provenance = True
loading_context.host_provenance = False # see 'WeaverResearchObject.user_provenance'

fs = runtime_context.make_fs_access or StdFsAccess
if not runtime_context.research_obj:
ro = ResearchObject(
ro = WeaverResearchObject(
fs(""),
temp_prefix_ro=runtime_context.tmpdir_prefix,
orcid=runtime_context.orcid,
full_name=runtime_context.cwl_full_name,
ro_uuid=self.job.uuid, # align the RO definition with the job (make the UUIDs logical)
settings=self.settings,
)

# rewrite auto-initialized random UUIDs with Weaver-specific references
ro.ro_uuid = self.job.uuid
ro.base_uri = f"arcp://uuid,{ro.ro_uuid}/"

loading_context.research_obj = ro
runtime_context.research_obj = ro

def finalize_provenance(self, runtime_context):
# type: (RuntimeContext) -> None
if runtime_context.research_obj:
ro = runtime_context.research_obj
prov_obj = runtime_context.prov_obj

# FIXME: all in try/except fails because 'prov_obj' is unset
# (operation already performed before we reach here! - find a way to hook ourselves during the operation)
# the actual creation of 'cwltool.cwlprov.provenance_profile.ProvenanceProfile'
# happens within one of the 'cwltool.executors.JobExecutor', which ends up
# calling 'process.parent_wf.finalize_prov_profile' directly before the end
# of 'cwltool.executors.JobExecutor.execute', which in turns generates all the PROV files
try:
prov_obj.document.add_namespace("doi", "https://doi.org/")
sha1_ns = prov_obj.document._namespaces.get_namespace("sha1")

crim_name = "Computer Research Institute of Montréal"
crim_entity = prov_obj.document.entity(
"_:crim",
{
prov.constants.PROV_TYPE: prov.constants.PROV["Organization"],
"foaf:name": crim_name,
"schema:name": crim_name,
}
)

weaver_url = get_weaver_url(self.settings)
weaver_sha1 = hashlib.sha1(weaver_url)
weaver_agent = prov_obj.document.agent(
sha1_ns.qname(weaver_sha1),
{
prov.constants.PROV_TYPE: prov.constants.PROV["SoftwareAgent"],
prov.constants.PROV_LOCATION: weaver_url,
prov.constants.PROV_LABEL: f"crim-ca/weaver {weaver_version}",
# "prov:qualifiedPrimarySource":
# "prov:Organization": "Computer Research Institute of Montréal (CRIM).",
# "foaf:Project": "https://github.com/crim-ca/weaver",
# "doi": "10.5281/zenodo.14210717" # see CITATION.cff
}
)

# cross-ref: https://wf4ever.github.io/ro/wfprov.owl
job_entity = prov_obj.document.entity(
self.job.uuid,
{
prov.constants.PROV_TYPE: cwl_prov_const.WFDESC["ProcessRun"],
prov.constants.PROV_LOCATION: self.job.job_url(self.settings),
prov.constants.PROV_LABEL: "Job Information",
}
)
proc_entity = prov_obj.document.entity(
self.job.uuid,
{
prov.constants.PROV_TYPE: cwl_prov_const.WFDESC["Process"],
prov.constants.PROV_LOCATION: self.job.process_url(self.settings),
prov.constants.PROV_LABEL: "Process Description",
}
)

cwl_agent = prov_obj.document.get_record(cwl_prov_const.ACCOUNT_UUID) # cwltool
usr_agent = prov_obj.document.get_record(cwl_prov_const.USER_UUID) # pseudo-user (machine user)
wf_agent = prov_obj.document.get_record(ro.engine_uuid) # current job run aligned with cwl workflow

# FIXME: patch override of 'host_provenance' since access through RO it is not possible
# (private function in cwltool.cwlprov.provenance_profile.ProvenanceProfile.generate_prov_doc
# cwl_agent.extend()
# document.agent(
# ACCOUNT_UUID,
# {
# PROV_TYPE: FOAF["OnlineAccount"],
# "prov:location": hostname,
# CWLPROV["hostname"]: hostname,
# },
# )

# define relationships
prov_obj.document.actedOnBehalfOf(weaver_agent, usr_agent)
prov_obj.document.specializationOf(weaver_agent, cwl_agent)
prov_obj.document.attribution(crim_entity, weaver_agent)
prov_obj.document.wasDerivedFrom(cwl_agent, weaver_agent)
# prov_obj.document.wasStartedBy(job_agent, weaver_agent)
prov_obj.document.wasStartedBy(wf_agent, job_entity, time=self.job.created)
# prov_obj.document.specializationOf(wf_agent, job_entity)
# prov_obj.document.alternateOf(wf_agent, job_entity)
except:
pass
# perform packaging of the workflow
packed_wf_str = repr_json(self.package, force_string=True, indent=2)
packed_workflow(runtime_context.research_obj, packed_wf_str)

# sign-off and persist completed PROV
prov_dir = self.job.prov_path(self.settings)
close_ro(ro, prov_dir)
close_ro(runtime_context.research_obj, prov_dir)

def update_requirements(self):
# type: () -> None
Expand Down

0 comments on commit 28af8a5

Please sign in to comment.