diff --git a/case.jsonld b/case.jsonld index 14aa48c..598480d 100644 --- a/case.jsonld +++ b/case.jsonld @@ -16,25 +16,6 @@ "uco-core:createdBy": { "@id": "kb:aef8e4c4-db83-59fd-8f71-b65cd7676c0a" }, - "@context": { - "@vocab": "http://caseontology.org/core#", - "case-investigation": "https://ontology.caseontology.org/case/investigation/", - "drafting": "http://example.org/ontology/drafting/", - "co": "http://purl.org/co/", - "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", - "uco-action": "https://ontology.unifiedcyberontology.org/uco/action/", - "uco-core": "https://ontology.unifiedcyberontology.org/uco/core/", - "uco-identity": "https://ontology.unifiedcyberontology.org/uco/identity/", - "uco-location": "https://ontology.unifiedcyberontology.org/uco/location/", - "uco-role": "https://ontology.unifiedcyberontology.org/uco/role/", - "uco-observable": "https://ontology.unifiedcyberontology.org/uco/observable/", - "uco-tool": "https://ontology.unifiedcyberontology.org/uco/tool/", - "uco-types": "https://ontology.unifiedcyberontology.org/uco/types/", - "uco-vocabulary": "https://ontology.unifiedcyberontology.org/uco/vocabulary/", - "xsd": "http://www.w3.org/2001/XMLSchema#", - "kb": "http://example.org/kb/" - }, "uco-core:object": [ { "@id": "kb:aef8e4c4-db83-59fd-8f71-b65cd7676c0a", @@ -579,7 +560,6 @@ "@id": "kb:a19e1e1f-3953-5fb9-92b6-2b46f85752b2", "@type": "case-investigation:Investigation", "uco-core:name": "Crime A", - "case-investigation:focus": "Transfer of Illicit Materials", "uco-core:description": "Inquiry into the transfer of illicit materials and the devices used to do so", "uco-core:object": [ { @@ -710,7 +690,8 @@ } ] } - ] + ], + "case-investigation:focus": "Transfer of Illicit Materials" }, { "@id": "kb:6f79d4ae-d92c-5cad-bbe5-a0afde6f475a", @@ -1853,5 +1834,24 @@ } ] } - ] + ], + "@context": { + "@vocab": "http://caseontology.org/core#", + "case-investigation": "https://ontology.caseontology.org/case/investigation/", + "drafting": "http://example.org/ontology/drafting/", + "co": "http://purl.org/co/", + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "rdfs": "http://www.w3.org/2000/01/rdf-schema#", + "uco-action": "https://ontology.unifiedcyberontology.org/uco/action/", + "uco-core": "https://ontology.unifiedcyberontology.org/uco/core/", + "uco-identity": "https://ontology.unifiedcyberontology.org/uco/identity/", + "uco-location": "https://ontology.unifiedcyberontology.org/uco/location/", + "uco-role": "https://ontology.unifiedcyberontology.org/uco/role/", + "uco-observable": "https://ontology.unifiedcyberontology.org/uco/observable/", + "uco-tool": "https://ontology.unifiedcyberontology.org/uco/tool/", + "uco-types": "https://ontology.unifiedcyberontology.org/uco/types/", + "uco-vocabulary": "https://ontology.unifiedcyberontology.org/uco/vocabulary/", + "xsd": "http://www.w3.org/2001/XMLSchema#", + "kb": "http://example.org/kb/" + } } diff --git a/case_mapping/case/investigation.py b/case_mapping/case/investigation.py index 39f6fc2..1e651d1 100644 --- a/case_mapping/case/investigation.py +++ b/case_mapping/case/investigation.py @@ -5,6 +5,7 @@ from ..base import Facet, UcoObject from ..uco.action import Action +from ..uco.core import ContextualCompilation from ..uco.location import Location @@ -46,8 +47,8 @@ def __init__( self["@type"] = "case-investigation:InvestigativeAction" -class CaseInvestigation(UcoObject): - def __init__(self, name=None, focus=None, description=None, core_objects=None): +class CaseInvestigation(ContextualCompilation): + def __init__(self, *args: Any, focus=None, **kwargs: Any) -> None: """ An investigative action is a CASE object that represents the who, where, when of investigation :param name: The name of an investigation (e.g., Murder of Suspect B,.) @@ -57,16 +58,13 @@ def __init__(self, name=None, focus=None, description=None, core_objects=None): object e.g., Persons involved in investigation, Investigation into a Murder, object refrences a case-object for a phone investigative action """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "case-investigation:Investigation" self._str_vars( **{ - "uco-core:name": name, "case-investigation:focus": focus, - "uco-core:description": description, } ) - self.append_core_objects(core_objects) class ProvenanceRecord(UcoObject): diff --git a/case_mapping/drafting/entities.py b/case_mapping/drafting/entities.py index 0e4d636..e0e4d63 100755 --- a/case_mapping/drafting/entities.py +++ b/case_mapping/drafting/entities.py @@ -1,3 +1,5 @@ +from typing import Any + from ..base import Facet, UcoObject, unpack_args_array @@ -175,6 +177,7 @@ def __init__( class SocialMediaActivityFacet(Facet): def __init__( self, + *args: Any, body=None, page_title=None, author_identifier=None, @@ -187,7 +190,8 @@ def __init__( created_time=None, application=None, url=None, - ): + **kwargs: Any, + ) -> None: """ Used to represent activity on social platfomrs :param body: The text of the post/message @@ -203,7 +207,7 @@ def __init__( :param application: the application used for creating the post :param application: the URL of the post """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = ["drafting:SocialMediaActivityFacet", "uco-core:Facet"] diff --git a/case_mapping/uco/core.py b/case_mapping/uco/core.py index 65f5ac8..5b9ad9a 100644 --- a/case_mapping/uco/core.py +++ b/case_mapping/uco/core.py @@ -1,12 +1,76 @@ from datetime import datetime -from typing import Any, List, Optional, Union +from typing import Any, Optional, Sequence, Union from pytz import timezone from ..base import UcoObject, unpack_args_array -class Bundle(UcoObject): +class Compilation(UcoObject): + def __init__( + self, + *args: Any, + core_objects: Optional[Sequence[UcoObject]] = None, + **kwargs: Any, + ) -> None: + """ + A compilation is a grouping of things. + """ + super().__init__(*args, **kwargs) + self["@type"] = "uco-core:Compilation" + if core_objects is not None and len(core_objects) > 0: + self.append_core_objects(core_objects) + + @unpack_args_array + def append_to_uco_object(self, *args) -> None: + """ + Add a single/tuple of result(s) to the list of outputs from an action + :param args: A CASE object, or objects, often an observable. (e.g., one of many devices from a search operation) + """ + self._append_observable_objects("uco-core:object", *args) + + +class ContextualCompilation(Compilation): + def __init__( + self, + *args: Any, + core_objects: Sequence[UcoObject], + **kwargs: Any, + ) -> None: + """ + A contextual compilation is a grouping of things sharing some context (e.g., a set of network connections observed on a given day, all accounts associated with a given person). + + Future implementation note: At and before CASE 1.3.0, at least one core:object must be supplied at instantiation time of a contextual compilation. At and after CASE 1.4.0, these objects will be optional. + """ + if len(core_objects) == 0: + raise ValueError( + "A ContextualCompilation is required to have at least one UcoObject to link at initiation time. This will become optional in CASE 1.4.0." + ) + super().__init__(*args, **kwargs) + self["@type"] = "uco-core:ContextualCompilation" + self.append_core_objects(core_objects) + + +class EnclosingCompilation(Compilation): + def __init__( + self, + *args: Any, + core_objects: Sequence[UcoObject], + **kwargs: Any, + ) -> None: + """ + An enclosing compilation is a container for a grouping of things. + """ + if len(core_objects) == 0: + raise ValueError( + "An EnclosingCompilation is required to have at least one UcoObject to link at initiation time." + ) + super().__init__(*args, **kwargs) + self["@type"] = "uco-core:EnclosingCompilation" + self.append_core_objects(core_objects) + + +class Bundle(EnclosingCompilation): def __init__( self, *args: Any, @@ -14,6 +78,8 @@ def __init__( ) -> None: """ The main CASE Object for representing a case and its activities and objects. + + Instantiating this class requires a starter sequence (set, list, or tuple) to be passed using the core_objects parameter. (See EnclosingCompilation.) To confirm conformant CASE will be generated, at least one UcoObject must be passed in this list. However, this does not initially need to be the complete sequence of objects that will be in this Bundle. Other UcoObjects can be added after initialization with bundle.append_to_uco_object. """ super().__init__(*args, **kwargs) self.build = [] # type: ignore @@ -39,6 +105,7 @@ def __init__( # Assign caller-selectible prefix label and IRI, after checking # for conflicts with hard-coded prefixes. # https://www.w3.org/TR/turtle/#prefixed-name + assert isinstance(self["@context"], dict) if self.prefix_label in self["@context"]: raise ValueError( "Requested prefix label already in use in hard-coded dictionary: '%s'. Please revise caller to use another label." @@ -51,14 +118,6 @@ def __init__( def append_to_case_graph(self, *args): self._append_observable_objects("@graph", *args) - @unpack_args_array - def append_to_uco_object(self, *args): - """ - Add a single/tuple of result(s) to the list of outputs from an action - :param args: A CASE object, or objects, often an observable. (e.g., one of many devices from a search operation) - """ - self._append_observable_objects("uco-core:object", *args) - @unpack_args_array def append_to_rdfs_comments(self, *args): self._append_strings("rdfs:comment", *args) @@ -119,4 +178,8 @@ def _addtime(self, _type: str) -> None: } -directory = {"uco-core:Bundle": Bundle} +directory = { + "uco-core:Bundle": Bundle, + "uco-core:Compilation": Compilation, + "uco-core:ContextualCompilation": ContextualCompilation, +} diff --git a/case_mapping/uco/identity.py b/case_mapping/uco/identity.py index 7f2d31c..954c4c0 100644 --- a/case_mapping/uco/identity.py +++ b/case_mapping/uco/identity.py @@ -1,14 +1,14 @@ -from typing import Dict, Optional +from typing import Any, Dict, Optional from ..base import Facet, IdentityAbstraction, UcoObject class BirthInformationFacet(Facet): - def __init__(self, birthdate=None): + def __init__(self, *args: Any, birthdate=None, **kwargs: Any) -> None: """ :param birthdate: the date of birth of an identity """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-identity:BirthInformationFacet" self._datetime_vars(**{"uco-identity:birthdate": birthdate}) @@ -31,12 +31,14 @@ def __init__(self, name: Optional[str] = None, facets=None): class SimpleNameFacet(Facet): - def __init__(self, given_name=None, family_name=None): + def __init__( + self, *args: Any, given_name=None, family_name=None, **kwargs: Any + ) -> None: """ :param given_name: Full name of the identity of person :param family_name: Family name of identity of person """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-identity:SimpleNameFacet" self._str_vars( **{ diff --git a/case_mapping/uco/location.py b/case_mapping/uco/location.py index ac8a68b..c8deb2f 100644 --- a/case_mapping/uco/location.py +++ b/case_mapping/uco/location.py @@ -1,13 +1,12 @@ -from typing import Optional +from typing import Any, Optional from ..base import Facet, UcoObject class Location(UcoObject): - def __init__(self, facets=None): - super().__init__() + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) self["@type"] = "uco-location:Location" - self.append_facets(facets) class LatLongCoordinatesFacet(Facet): diff --git a/case_mapping/uco/observable.py b/case_mapping/uco/observable.py index fdee589..75f247b 100644 --- a/case_mapping/uco/observable.py +++ b/case_mapping/uco/observable.py @@ -209,7 +209,9 @@ def __init__( class AccountFacet(Facet): - def __init__(self, identifier=None, is_active=True, issuer_id=None): + def __init__( + self, *args: Any, identifier=None, is_active=True, issuer_id=None, **kwargs: Any + ) -> None: """ Used to represent user accounts :param is_active: Active unless specified otherwise (False) @@ -217,7 +219,7 @@ def __init__(self, identifier=None, is_active=True, issuer_id=None): :param issuer_id: The id of issuing body for application (e.g., kb:organization-skypeapp-cc44c2ae-bdd3-4df8-9ca3-1f58d682d62b) """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:AccountFacet" self._bool_vars(**{"uco-observable:isActive": is_active}) self._str_vars( @@ -292,6 +294,7 @@ def __init__( class ContentDataFacet(Facet): def __init__( self, + *args: Any, byte_order=None, magic_number=None, mime_type=None, @@ -301,7 +304,8 @@ def __init__( is_encrypted=None, hash_method=None, hash_value=None, - ): + **kwargs: Any, + ) -> None: """ The characteristics of a block of digital data. :param byte_order: Byte order of data. Example - "BigEndian" @@ -314,7 +318,7 @@ def __init__( :param hash_method: The algorithm used to calculate the hash value :param hash_value: The cryptographic hash of this content """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:ContentDataFacet" self._str_vars( **{ @@ -334,7 +338,7 @@ def __init__( } if hash_method is not None or hash_value is not None or hash_value != "-": - data = { + data: dict[str, Any] = { "@id": self.prefix_label + ":" + str(local_uuid()), "@type": "uco-types:Hash", } @@ -392,7 +396,15 @@ def __init__(self, range_offset=None, range_size=None): class DeviceFacet(Facet): - def __init__(self, device_type=None, manufacturer=None, model=None, serial=None): + def __init__( + self, + *args: Any, + device_type=None, + manufacturer=None, + model=None, + serial=None, + **kwargs: Any, + ) -> None: """ Characteristics of a piece of electronic equipment. :param device_type: The type of device (e.g., "camera") @@ -400,7 +412,7 @@ def __init__(self, device_type=None, manufacturer=None, model=None, serial=None) :param model: The model of the device (e.g., "Powershot SX540") :param serial: The serial phone_number of the device (e.g., "1296-3219-8792-CL918") """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:DeviceFacet" self._node_reference_vars(**{"uco-observable:manufacturer": manufacturer}) self._str_vars( @@ -519,7 +531,9 @@ def __init__( class UrlHistoryFacet(Facet): - def __init__(self, browser=None, history_entries=None): + def __init__( + self, *args: Any, browser=None, history_entries=None, **kwargs: Any + ) -> None: """ :param browser_info: An observable object containing a URLHistoryFacet :param history_entries: A list of dictionaries, each dict has the @@ -536,7 +550,7 @@ def __init__(self, browser=None, history_entries=None): "uco-observable:url": url_object, "uco-observable:visitCount": int, """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:URLHistoryFacet" self._node_reference_vars( **{ @@ -561,7 +575,7 @@ def __init__(self, browser=None, history_entries=None): self["uco-observable:urlHistoryEntry"] = [] for entry in history_entries: - history_entry = {} + history_entry: dict[str, Any] = dict() history_entry["@id"] = self.prefix_label + ":" + local_uuid() history_entry["@type"] = "uco-observable:URLHistoryEntry" for key, var in entry.items(): @@ -619,6 +633,7 @@ def __init__(self, browser=None, history_entries=None): class UrlFacet(Facet): def __init__( self, + *args: Any, url_address=None, url_port=None, url_host=None, @@ -628,7 +643,8 @@ def __init__( url_query=None, url_scheme=None, url_username=None, - ): + **kwargs: Any, + ) -> None: """ :param url_address: an address of a url (i.e. google.ie) :param url_port: a tcp or udp port of a url for example 3000 @@ -640,7 +656,7 @@ def __init__( :param url_scheme: Identifies the type of URL. (e.g. ssh://) :param url_username: A username that may be required for authentication for a specific resource. (login) """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:URLFacet" self._str_vars( **{ @@ -701,13 +717,15 @@ def __init__( class RasterPictureFacet(Facet): def __init__( self, + *args: Any, camera_id=None, bits_per_pixel=None, picture_height=None, picture_width=None, image_compression_method=None, picture_type=None, - ): + **kwargs: Any, + ) -> None: """ This CASEObject represents the contents of a file or device :param camera_id: An observable cyberitem @@ -717,7 +735,7 @@ def __init__( :param image_compression_method: The compression method used :param picture_type: The type of picture ("jpg", "png" etc.) """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:RasterPictureFacet" self._str_vars( **{ @@ -786,11 +804,11 @@ def __init__( class PhoneAccountFacet(Facet): - def __init__(self, phone_number=None): + def __init__(self, *args: Any, phone_number=None, **kwargs: Any) -> None: """ :param phone_number: The number for this account (e.g., "+16503889249") """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:PhoneAccountFacet" self._str_vars( **{ @@ -800,22 +818,24 @@ def __init__(self, phone_number=None): class EmailAccountFacet(Facet): - def __init__(self, email_address): + def __init__(self, *args: Any, email_address, **kwargs: Any) -> None: """ :param email_address: An ObservableObject (with EmailAdressFacet) """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:EmailAccountFacet" self._node_reference_vars(**{"uco-observable:emailAddress": email_address}) class EmailAddressFacet(Facet): - def __init__(self, email_address_value=None, display_name=None): + def __init__( + self, *args: Any, email_address_value=None, display_name=None, **kwargs: Any + ) -> None: """ Used to represent the value of an email address. :param email_address_value: a single email address (e.g., "bob@example.com") """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:EmailAddressFacet" self._str_vars( **{ @@ -828,6 +848,7 @@ def __init__(self, email_address_value=None, display_name=None): class EmailMessageFacet(Facet): def __init__( self, + *args: Any, msg_to=None, msg_from=None, cc=None, @@ -853,7 +874,8 @@ def __init__( is_mime_encoded=None, allocation_status=None, is_multipart=None, - ): + **kwargs: Any, + ) -> None: """ An instance of an email message, corresponding to the internet message format described in RFC 5322 and related. :param msg_to: A list of ObservableObjects (with EmailAccountFacet) @@ -882,7 +904,7 @@ def __init__( :param is_multipart: A boolean True/False :param allocation_status: """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:EmailMessageFacet" self._str_vars( **{ @@ -928,12 +950,14 @@ def __init__( class EXIFFacet(Facet): - def __init__(self, **kwargs): + def __init__( + self, *args: Any, exif_key_value_pairs: dict[str, str], **kwargs: Any + ) -> None: """ Specifies exchangeable image file format (Exif) metadata tags for image and sound files recorded by digital cameras. :param kwargs: The user provided key/value pairs of exif items (e.g., Make="Canon", etc.). """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:EXIFFacet" self["uco-observable:exifData"] = { @@ -941,7 +965,7 @@ def __init__(self, **kwargs): "@type": "uco-types:ControlledDictionary", "uco-types:entry": [], } - for k, v in kwargs.items(): + for k, v in exif_key_value_pairs.items(): if v not in ["", " "]: item = { "@id": self.prefix_label + ":" + str(local_uuid()), @@ -1147,6 +1171,7 @@ def __init__( class MessageFacet(Facet): def __init__( self, + *args: Any, msg_to=None, msg_from=None, message_text=None, @@ -1155,7 +1180,8 @@ def __init__( message_type=None, message_id=None, session_id=None, - ): + **kwargs: Any, + ) -> None: """ Characteristics of an electronic message. :param msg_to: A list of ObservableObjects @@ -1167,7 +1193,7 @@ def __init__( :param message_id: A unique identifier for the message. :param session_id: The priority of the email. """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:MessageFacet" self._str_vars( **{ @@ -1434,13 +1460,13 @@ def __init__( class ApplicationAccountFacet(Facet): - def __init__(self, application=None): + def __init__(self, *args: Any, application=None, **kwargs: Any) -> None: """ An application account facet is a grouping of characteristics unique to an account within a particular software program designed for end users. :param application: An Observable Object (containing an Application Facet) """ - super().__init__() + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:ApplicationAccountFacet" self._node_reference_vars(**{"uco-observable:application": application}) @@ -1620,12 +1646,10 @@ def append_participants(self, *args): self._append_refs("uco-observable:participant", *args) -class MessageThread(UcoObject): - def __init__(self, name=None, facets=None): - super().__init__() +class MessageThread(ObservableObject): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) self["@type"] = "uco-observable:MessageThread" - self._str_vars(**{"uco-core:name": name}) - self.append_facets(facets) class Message(ObservableObject): diff --git a/example.py b/example.py index 48c0f73..8fd53f0 100755 --- a/example.py +++ b/example.py @@ -39,6 +39,7 @@ def _next_timestamp() -> datetime: bundle_modified_time = datetime.strptime("2024-05-02T21:38:19", "%Y-%m-%dT%H:%M:%S") bundle = uco.core.Bundle( + core_objects=[bundle_identity], created_by=bundle_identity, description="An Example Case File", modified_time=bundle_modified_time, @@ -47,7 +48,6 @@ def _next_timestamp() -> datetime: spec_version="UCO/CASE 1.3", tag="Artifacts extracted from a mobile phone", ) -bundle.append_to_uco_object(bundle_identity) investigation_items: list[base.UcoObject] = [] @@ -117,7 +117,7 @@ def _next_timestamp() -> datetime: ) exif = {"Make": "Canon", "Model": "Powershot"} -file_exif1 = uco.observable.EXIFFacet(**exif) +file_exif1 = uco.observable.EXIFFacet(exif_key_value_pairs=exif) sd_card.append_facets(file1, file_content1, file_raster1, file_exif1) bundle.append_to_uco_object(sd_card)