Source code for pycti.entities.opencti_stix_domain_object

# coding: utf-8

import json
import os

import magic


[docs] class StixDomainObject: """Main StixDomainObject class for OpenCTI Manages STIX Domain Objects in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the StixDomainObject instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types spec_version created_at updated_at objectOrganization { id standard_id name } createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified } } } revoked confidence created modified ... on AttackPattern { name description aliases x_mitre_platforms x_mitre_permissions_required x_mitre_detection x_mitre_id killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Campaign { name description aliases first_seen last_seen objective } ... on Note { attribute_abstract content authors note_types likelihood objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on ObservedData { first_observed last_observed number_observed objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Opinion { explanation authors opinion objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Report { name description report_types published objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Grouping { name description context objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on CourseOfAction { name description x_opencti_aliases } ... on DataComponent { name description dataSource { id standard_id entity_type parent_types spec_version created_at updated_at revoked confidence created modified name description x_mitre_platforms collection_layers } } ... on DataSource { name description x_mitre_platforms collection_layers } ... on Individual { name description x_opencti_aliases contact_information x_opencti_firstname x_opencti_lastname } ... on Organization { name description x_opencti_aliases contact_information x_opencti_organization_type x_opencti_reliability } ... on Sector { name description x_opencti_aliases contact_information } ... on System { name description x_opencti_aliases } ... on Indicator { pattern_type pattern_version pattern name description indicator_types valid_from valid_until x_opencti_score x_opencti_detection x_opencti_main_observable_type } ... on Infrastructure { name description aliases infrastructure_types first_seen last_seen } ... on IntrusionSet { name description aliases first_seen last_seen goals resource_level primary_motivation secondary_motivations } ... on City { name description latitude longitude precision x_opencti_aliases } ... on Country { name description latitude longitude precision x_opencti_aliases } ... on Region { name description latitude longitude precision x_opencti_aliases } ... on Position { name description latitude longitude precision x_opencti_aliases street_address postal_code } ... on Malware { name description aliases malware_types is_family first_seen last_seen architecture_execution_envs implementation_languages capabilities killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on MalwareAnalysis { product version configuration_version modules analysis_engine_version analysis_definition_version submitted analysis_started analysis_ended result_name result } ... on ThreatActor { name description aliases threat_actor_types first_seen last_seen roles goals sophistication resource_level primary_motivation secondary_motivations personal_motivations } ... on Tool { name description aliases tool_types tool_version killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Event { name description aliases event_types } ... on Channel { name description aliases channel_types } ... on Narrative { name description aliases narrative_types } ... on DataComponent { name description } ... on DataSource { name description } ... on Case { name description objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Vulnerability { name description x_opencti_aliases x_opencti_cvss_vector_string x_opencti_cvss_base_score x_opencti_cvss_base_severity x_opencti_cvss_attack_vector x_opencti_cvss_attack_complexity x_opencti_cvss_privileges_required x_opencti_cvss_user_interaction x_opencti_cvss_scope x_opencti_cvss_confidentiality_impact x_opencti_cvss_integrity_impact x_opencti_cvss_availability_impact x_opencti_cvss_exploit_code_maturity x_opencti_cvss_remediation_level x_opencti_cvss_report_confidence x_opencti_cvss_temporal_score x_opencti_cvss_v2_vector_string x_opencti_cvss_v2_base_score x_opencti_cvss_v2_access_vector x_opencti_cvss_v2_access_complexity x_opencti_cvss_v2_authentication x_opencti_cvss_v2_confidentiality_impact x_opencti_cvss_v2_integrity_impact x_opencti_cvss_v2_availability_impact x_opencti_cvss_v2_exploitability x_opencti_cvss_v2_remediation_level x_opencti_cvss_v2_report_confidence x_opencti_cvss_v2_temporal_score x_opencti_cvss_v4_vector_string x_opencti_cvss_v4_base_score x_opencti_cvss_v4_base_severity x_opencti_cvss_v4_attack_vector x_opencti_cvss_v4_attack_complexity x_opencti_cvss_v4_attack_requirements x_opencti_cvss_v4_privileges_required x_opencti_cvss_v4_user_interaction x_opencti_cvss_v4_confidentiality_impact_v x_opencti_cvss_v4_confidentiality_impact_s x_opencti_cvss_v4_integrity_impact_v x_opencti_cvss_v4_integrity_impact_s x_opencti_cvss_v4_availability_impact_v x_opencti_cvss_v4_availability_impact_s x_opencti_cvss_v4_exploit_maturity x_opencti_cwe x_opencti_cisa_kev x_opencti_epss_score x_opencti_epss_percentile x_opencti_score } ... on Incident { name description aliases first_seen last_seen objective } """
[docs] self.properties_with_files = """ id standard_id entity_type parent_types spec_version created_at updated_at objectOrganization { id standard_id name } createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified ... on AttackPattern { name description aliases x_mitre_platforms x_mitre_permissions_required x_mitre_detection x_mitre_id killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Campaign { name description aliases first_seen last_seen objective } ... on Note { attribute_abstract content authors note_types likelihood objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on ObservedData { first_observed last_observed number_observed objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Opinion { explanation authors opinion objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Report { name description report_types published objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Grouping { name description context objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on CourseOfAction { name description x_opencti_aliases } ... on DataComponent { name description dataSource { id standard_id entity_type parent_types spec_version created_at updated_at revoked confidence created modified name description x_mitre_platforms collection_layers } } ... on DataSource { name description x_mitre_platforms collection_layers } ... on Individual { name description x_opencti_aliases contact_information x_opencti_firstname x_opencti_lastname } ... on Organization { name description x_opencti_aliases contact_information x_opencti_organization_type x_opencti_reliability } ... on Sector { name description x_opencti_aliases contact_information } ... on System { name description x_opencti_aliases } ... on Indicator { pattern_type pattern_version pattern name description indicator_types valid_from valid_until x_opencti_score x_opencti_detection x_opencti_main_observable_type } ... on Infrastructure { name description aliases infrastructure_types first_seen last_seen } ... on IntrusionSet { name description aliases first_seen last_seen goals resource_level primary_motivation secondary_motivations } ... on City { name description latitude longitude precision x_opencti_aliases } ... on Country { name description latitude longitude precision x_opencti_aliases } ... on Region { name description latitude longitude precision x_opencti_aliases } ... on Position { name description latitude longitude precision x_opencti_aliases street_address postal_code } ... on Malware { name description aliases malware_types is_family first_seen last_seen architecture_execution_envs implementation_languages capabilities killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on MalwareAnalysis { product version configuration_version modules analysis_engine_version analysis_definition_version submitted analysis_started analysis_ended result_name result } ... on ThreatActor { name description aliases threat_actor_types first_seen last_seen roles goals sophistication resource_level primary_motivation secondary_motivations personal_motivations } ... on Tool { name description aliases tool_types tool_version killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Event { name description aliases event_types } ... on Channel { name description aliases channel_types } ... on Narrative { name description aliases narrative_types } ... on DataComponent { name description } ... on DataSource { name description } ... on Case { name description objects { edges { node { ... on BasicObject { id parent_types entity_type standard_id } ... on BasicRelationship { id parent_types entity_type standard_id } } } } } ... on Vulnerability { name description x_opencti_aliases x_opencti_cvss_vector_string x_opencti_cvss_base_score x_opencti_cvss_base_severity x_opencti_cvss_attack_vector x_opencti_cvss_attack_complexity x_opencti_cvss_privileges_required x_opencti_cvss_user_interaction x_opencti_cvss_scope x_opencti_cvss_confidentiality_impact x_opencti_cvss_integrity_impact x_opencti_cvss_availability_impact x_opencti_cvss_exploit_code_maturity x_opencti_cvss_remediation_level x_opencti_cvss_report_confidence x_opencti_cvss_temporal_score x_opencti_cvss_v2_vector_string x_opencti_cvss_v2_base_score x_opencti_cvss_v2_access_vector x_opencti_cvss_v2_access_complexity x_opencti_cvss_v2_authentication x_opencti_cvss_v2_confidentiality_impact x_opencti_cvss_v2_integrity_impact x_opencti_cvss_v2_availability_impact x_opencti_cvss_v2_exploitability x_opencti_cvss_v2_remediation_level x_opencti_cvss_v2_report_confidence x_opencti_cvss_v2_temporal_score x_opencti_cvss_v4_vector_string x_opencti_cvss_v4_base_score x_opencti_cvss_v4_base_severity x_opencti_cvss_v4_attack_vector x_opencti_cvss_v4_attack_complexity x_opencti_cvss_v4_attack_requirements x_opencti_cvss_v4_privileges_required x_opencti_cvss_v4_user_interaction x_opencti_cvss_v4_confidentiality_impact_v x_opencti_cvss_v4_confidentiality_impact_s x_opencti_cvss_v4_integrity_impact_v x_opencti_cvss_v4_integrity_impact_s x_opencti_cvss_v4_availability_impact_v x_opencti_cvss_v4_availability_impact_s x_opencti_cvss_v4_exploit_maturity x_opencti_cwe x_opencti_cisa_kev x_opencti_epss_score x_opencti_epss_percentile x_opencti_score } ... on Incident { name description aliases first_seen last_seen objective } importFiles { edges { node { id name size metaData { mimetype version } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } } } } """
[docs] def list(self, **kwargs): """List Stix-Domain-Object objects. :param types: the list of types :type types: list :param filters: the filters to apply :type filters: dict :param search: the search keyword :type search: str :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :param orderBy: field to order results by :type orderBy: str :param orderMode: ordering mode (asc/desc) :type orderMode: str :param customAttributes: custom attributes to return :type customAttributes: str :param getAll: whether to retrieve all results :type getAll: bool :param withPagination: whether to include pagination info :type withPagination: bool :param withFiles: whether to include files :type withFiles: bool :return: List of Stix-Domain-Object objects :rtype: list """ types = kwargs.get("types", None) filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 100) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) with_files = kwargs.get("withFiles", False) self.opencti.app_logger.info( "Listing Stix-Domain-Objects with filters", {"filters": json.dumps(filters)} ) query = ( """ query StixDomainObjects($types: [String], $filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: StixDomainObjectsOrdering, $orderMode: OrderingMode) { stixDomainObjects(types: $types, filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "types": types, "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["stixDomainObjects"]) final_data = final_data + data while result["data"]["stixDomainObjects"]["pageInfo"]["hasNextPage"]: after = result["data"]["stixDomainObjects"]["pageInfo"]["endCursor"] self.opencti.app_logger.debug( "Listing Stix-Domain-Objects", {"after": after} ) result = self.opencti.query( query, { "types": types, "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple( result["data"]["stixDomainObjects"] ) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["stixDomainObjects"], with_pagination )
[docs] def read(self, **kwargs): """Read a Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param types: list of Stix Domain Entity types :type types: list :param filters: the filters to apply if no id provided :type filters: dict :param customAttributes: custom attributes to return :type customAttributes: str :param withFiles: whether to include files :type withFiles: bool :return: Stix-Domain-Object object :rtype: dict or None """ id = kwargs.get("id", None) types = kwargs.get("types", None) filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) with_files = kwargs.get("withFiles", False) if id is not None: self.opencti.app_logger.info("Reading Stix-Domain-Object", {"id": id}) query = ( """ query StixDomainObject($id: String!) { stixDomainObject(id: $id) { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["stixDomainObject"] ) elif filters is not None: result = self.list( types=types, filters=filters, customAttributes=custom_attributes ) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_stix_domain_object] Missing parameters: id or filters" ) return None
[docs] def get_by_stix_id_or_name(self, **kwargs): """Get a Stix-Domain-Object object by stix_id or name. :param types: a list of Stix-Domain-Object types :type types: list :param stix_id: the STIX ID of the Stix-Domain-Object :type stix_id: str :param name: the name of the Stix-Domain-Object :type name: str :param aliases: list of aliases to search :type aliases: list :param fieldName: the field name to use for alias search :type fieldName: str :param customAttributes: custom attributes to return :type customAttributes: str :return: Stix-Domain-Object object :rtype: dict or None """ types = kwargs.get("types", None) stix_id = kwargs.get("stix_id", None) name = kwargs.get("name", None) aliases = kwargs.get("aliases", []) field_name = kwargs.get("fieldName", "aliases") custom_attributes = kwargs.get("customAttributes", None) object_result = None if stix_id is not None: object_result = self.read(id=stix_id, customAttributes=custom_attributes) if object_result is None and name is not None: # TODO: Change this logic and move it to the API. object_result = self.read( types=types, filters={ "mode": "and", "filters": [{"key": "name", "values": [name]}], "filterGroups": [], }, customAttributes=custom_attributes, ) if object_result is None: object_result = self.read( types=types, filters={ "mode": "and", "filters": [{"key": field_name, "values": [name]}], "filterGroups": [], }, customAttributes=custom_attributes, ) if object_result is None: for alias in aliases: object_result = self.read( types=types, filters={ "mode": "and", "filters": [{"key": field_name, "values": [alias]}], "filterGroups": [], }, customAttributes=custom_attributes, ) return object_result
[docs] def update_field(self, **kwargs): """Update a Stix-Domain-Object object field. :param id: the Stix-Domain-Object id :type id: str :param input: the input of the field :type input: list :return: Updated Stix-Domain-Object object :rtype: dict or None """ id = kwargs.get("id", None) input = kwargs.get("input", None) if id is not None and input is not None: self.opencti.app_logger.info("Updating Stix-Domain-Object", {"id": id}) query = """ mutation StixDomainObjectEdit($id: ID!, $input: [EditInput]!) { stixDomainObjectEdit(id: $id) { fieldPatch(input: $input) { id standard_id entity_type } } } """ result = self.opencti.query( query, { "id": id, "input": input, }, ) return self.opencti.process_multiple_fields( result["data"]["stixDomainObjectEdit"]["fieldPatch"] ) else: self.opencti.app_logger.error( "[opencti_stix_domain_object] Missing parameters: id and input" ) return None
[docs] def delete(self, **kwargs): """Delete a Stix-Domain-Object. :param id: the Stix-Domain-Object id :type id: str :return: None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Deleting Stix-Domain-Object", {"id": id}) query = """ mutation StixDomainObjectEdit($id: ID!) { stixDomainObjectEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error( "[opencti_stix_domain_object] Missing parameters: id" ) return None
[docs] def add_file(self, **kwargs): """Upload a file to this Stix-Domain-Object. :param id: the Stix-Domain-Object id :type id: str :param file_name: the file name or path :type file_name: str :param data: the file data (optional, will read from file_name if not provided) :type data: bytes or None :param fileMarkings: list of marking definition IDs for the file :type fileMarkings: list :param version: version datetime :type version: str :param mime_type: MIME type of the file :type mime_type: str :param no_trigger_import: whether to skip triggering import :type no_trigger_import: bool :param embedded: whether the file is embedded :type embedded: bool :return: File upload result :rtype: dict or None """ id = kwargs.get("id", None) file_name = kwargs.get("file_name", None) data = kwargs.get("data", None) file_markings = kwargs.get("fileMarkings", None) version = kwargs.get("version", None) mime_type = kwargs.get("mime_type", "text/plain") no_trigger_import = kwargs.get("no_trigger_import", False) embedded = kwargs.get("embedded", False) if id is not None and file_name is not None: final_file_name = os.path.basename(file_name) query = """ mutation StixDomainObjectEdit($id: ID!, $file: Upload!, $fileMarkings: [String], $version: DateTime, $noTriggerImport: Boolean, $embedded: Boolean) { stixDomainObjectEdit(id: $id) { importPush(file: $file, version: $version, fileMarkings: $fileMarkings, noTriggerImport: $noTriggerImport, embedded: $embedded) { id name } } } """ if data is None: data = open(file_name, "rb") if file_name.endswith(".json"): mime_type = "application/json" else: mime_type = magic.from_file(file_name, mime=True) self.opencti.app_logger.info( "Uploading a file in Stix-Domain-Object", {"file": final_file_name, "id": id}, ) return self.opencti.query( query, { "id": id, "file": (self.opencti.file(final_file_name, data, mime_type)), "fileMarkings": file_markings, "version": version, "noTriggerImport": ( no_trigger_import if isinstance(no_trigger_import, bool) else no_trigger_import == "True" ), "embedded": embedded, }, ) else: self.opencti.app_logger.error( "[opencti_stix_domain_object] Missing parameters: id or file_name" ) return None
[docs] def push_list_export( self, entity_id, entity_type, file_name, file_markings, data, list_filters="", mime_type=None, ): """Push a list export file. :param entity_id: the entity id :type entity_id: str :param entity_type: the entity type :type entity_type: str :param file_name: the file name :type file_name: str :param file_markings: list of marking definition IDs :type file_markings: list :param data: the file data :type data: bytes or str :param list_filters: filters applied to the list export :type list_filters: str :param mime_type: MIME type of the file :type mime_type: str or None :return: None """ query = """ mutation StixDomainObjectsExportPush($entity_id: String, $entity_type: String!, $file: Upload!, $file_markings: [String]!, $listFilters: String) { stixDomainObjectsExportPush(entity_id: $entity_id, entity_type: $entity_type, file: $file, file_markings: $file_markings, listFilters: $listFilters) } """ if mime_type is None: file = self.opencti.file(file_name, data) else: file = self.opencti.file(file_name, data, mime_type) self.opencti.query( query, { "entity_id": entity_id, "entity_type": entity_type, "file": file, "file_markings": file_markings, "listFilters": list_filters, }, )
[docs] def push_entity_export( self, entity_id, file_name, data, file_markings=None, mime_type=None ): """Push an entity export file. :param entity_id: the entity id :type entity_id: str :param file_name: the file name :type file_name: str :param data: the file data :type data: bytes or str :param file_markings: list of marking definition IDs :type file_markings: list or None :param mime_type: MIME type of the file :type mime_type: str or None :return: None """ if file_markings is None: file_markings = [] query = """ mutation StixDomainObjectEdit( $id: ID!, $file: Upload!, $file_markings: [String]! ) { stixDomainObjectEdit(id: $id) { exportPush( file: $file, file_markings: $file_markings ) } } """ if mime_type is None: file = self.opencti.file(file_name, data) else: file = self.opencti.file(file_name, data, mime_type) self.opencti.query( query, {"id": entity_id, "file": file, "file_markings": file_markings} )
[docs] def update_created_by(self, **kwargs): """Update the Identity author of a Stix-Domain-Object object (created_by). :param id: the id of the Stix-Domain-Object :type id: str :param identity_id: the id of the Identity :type identity_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) identity_id = kwargs.get("identity_id", None) if id is not None: self.opencti.app_logger.info( "Updating author of Stix-Domain-Object with Identity", {"id": id, "identity_id": identity_id}, ) custom_attributes = """ id createdBy { ... on Identity { id standard_id entity_type parent_types name x_opencti_aliases description created modified } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } """ stix_domain_object = self.read(id=id, customAttributes=custom_attributes) if stix_domain_object["createdBy"] is not None: query = """ mutation StixDomainObjectEdit($id: ID!, $toId: StixRef! $relationship_type: String!) { stixDomainObjectEdit(id: $id) { relationDelete(toId: $toId, relationship_type: $relationship_type) { id } } } """ self.opencti.query( query, { "id": id, "toId": stix_domain_object["createdBy"]["id"], "relationship_type": "created-by", }, ) if identity_id is not None: # Add the new relation query = """ mutation StixDomainObjectEdit($id: ID!, $input: StixRefRelationshipAddInput!) { stixDomainObjectEdit(id: $id) { relationAdd(input: $input) { id } } } """ variables = { "id": id, "input": { "toId": identity_id, "relationship_type": "created-by", }, } self.opencti.query(query, variables) else: self.opencti.app_logger.error("Missing parameters: id") return False
[docs] def add_marking_definition(self, **kwargs): """Add a Marking-Definition object to Stix-Domain-Object object (object_marking_refs). :param id: the id of the Stix-Domain-Object :type id: str :param marking_definition_id: the id of the Marking-Definition :type marking_definition_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) marking_definition_id = kwargs.get("marking_definition_id", None) if id is not None and marking_definition_id is not None: custom_attributes = """ id objectMarking { id standard_id entity_type definition_type definition x_opencti_order x_opencti_color created modified } """ stix_domain_object = self.read(id=id, customAttributes=custom_attributes) if stix_domain_object is None: self.opencti.app_logger.error( "Cannot add Marking-Definition, entity not found" ) return False if marking_definition_id in stix_domain_object["objectMarkingIds"]: return True else: self.opencti.app_logger.info( "Adding Marking-Definition to Stix-Domain-Object", {"marking_definition_id": marking_definition_id, "id": id}, ) query = """ mutation StixDomainObjectAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) { stixDomainObjectEdit(id: $id) { relationAdd(input: $input) { id } } } """ self.opencti.query( query, { "id": id, "input": { "toId": marking_definition_id, "relationship_type": "object-marking", }, }, ) return True else: self.opencti.app_logger.error( "Missing parameters: id and marking_definition_id" ) return False
[docs] def remove_marking_definition(self, **kwargs): """Remove a Marking-Definition object from Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param marking_definition_id: the id of the Marking-Definition :type marking_definition_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) marking_definition_id = kwargs.get("marking_definition_id", None) if id is not None and marking_definition_id is not None: self.opencti.app_logger.info( "Removing Marking-Definition from Stix-Domain-Object", {"marking_definition_id": marking_definition_id, "id": id}, ) query = """ mutation StixDomainObjectRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) { stixDomainObjectEdit(id: $id) { relationDelete(toId: $toId, relationship_type: $relationship_type) { id } } } """ self.opencti.query( query, { "id": id, "toId": marking_definition_id, "relationship_type": "object-marking", }, ) return True else: self.opencti.app_logger.error( "Missing parameters: id and marking_definition_id" ) return False
[docs] def add_label(self, **kwargs): """Add a Label object to Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param label_id: the id of the Label :type label_id: str :param label_name: the name of the Label (alternative to label_id) :type label_name: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) label_id = kwargs.get("label_id", None) label_name = kwargs.get("label_name", None) if label_name is not None: label = self.opencti.label.read( filters={ "mode": "and", "filters": [{"key": "value", "values": [label_name]}], "filterGroups": [], } ) if label: label_id = label["id"] else: label = self.opencti.label.create(value=label_name) label_id = label["id"] if id is not None and label_id is not None: self.opencti.app_logger.info( "Adding label to Stix-Domain-Object", {"label_id": label_id, "id": id} ) query = """ mutation StixDomainObjectAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) { stixDomainObjectEdit(id: $id) { relationAdd(input: $input) { id } } } """ self.opencti.query( query, { "id": id, "input": { "toId": label_id, "relationship_type": "object-label", }, }, ) return True else: self.opencti.app_logger.error("Missing parameters: id and label_id") return False
[docs] def remove_label(self, **kwargs): """Remove a Label object from Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param label_id: the id of the Label :type label_id: str :param label_name: the name of the Label (alternative to label_id) :type label_name: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) label_id = kwargs.get("label_id", None) label_name = kwargs.get("label_name", None) if label_name is not None: label = self.opencti.label.read( filters={ "mode": "and", "filters": [{"key": "value", "values": [label_name]}], "filterGroups": [], } ) if label: label_id = label["id"] if id is not None and label_id is not None: self.opencti.app_logger.info( "Removing label from Stix-Domain-Object", {"label_id": label_id, "id": id}, ) query = """ mutation StixDomainObjectRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) { stixDomainObjectEdit(id: $id) { relationDelete(toId: $toId, relationship_type: $relationship_type) { id } } } """ self.opencti.query( query, { "id": id, "toId": label_id, "relationship_type": "object-label", }, ) return True else: self.opencti.app_logger.error("Missing parameters: id and label_id") return False
[docs] def add_external_reference(self, **kwargs): """Add an External-Reference object to Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param external_reference_id: the id of the External-Reference :type external_reference_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) external_reference_id = kwargs.get("external_reference_id", None) if id is not None and external_reference_id is not None: self.opencti.app_logger.info( "Adding External-Reference to Stix-Domain-Object", {"external_reference_id": external_reference_id, "id": id}, ) query = """ mutation StixDomainObjectEditRelationAdd($id: ID!, $input: StixRefRelationshipAddInput!) { stixDomainObjectEdit(id: $id) { relationAdd(input: $input) { id } } } """ self.opencti.query( query, { "id": id, "input": { "toId": external_reference_id, "relationship_type": "external-reference", }, }, ) return True else: self.opencti.app_logger.error( "Missing parameters: id and external_reference_id" ) return False
[docs] def remove_external_reference(self, **kwargs): """Remove an External-Reference object from Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param external_reference_id: the id of the External-Reference :type external_reference_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) external_reference_id = kwargs.get("external_reference_id", None) if id is not None and external_reference_id is not None: self.opencti.app_logger.info( "Removing External-Reference from Stix-Domain-Object", {"external_reference_id": external_reference_id, "id": id}, ) query = """ mutation StixDomainObjectRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) { stixDomainObjectEdit(id: $id) { relationDelete(toId: $toId, relationship_type: $relationship_type) { id } } } """ self.opencti.query( query, { "id": id, "toId": external_reference_id, "relationship_type": "external-reference", }, ) return True else: self.opencti.app_logger.error( "Missing parameters: id and external_reference_id" ) return False
[docs] def add_kill_chain_phase(self, **kwargs): """Add a Kill-Chain-Phase object to Stix-Domain-Object object (kill_chain_phases). :param id: the id of the Stix-Domain-Object :type id: str :param kill_chain_phase_id: the id of the Kill-Chain-Phase :type kill_chain_phase_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) kill_chain_phase_id = kwargs.get("kill_chain_phase_id", None) if id is not None and kill_chain_phase_id is not None: self.opencti.app_logger.info( "Adding Kill-Chain-Phase to Stix-Domain-Object", {"kill_chain_phase_id": kill_chain_phase_id, "id": id}, ) query = """ mutation StixDomainObjectAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) { stixDomainObjectEdit(id: $id) { relationAdd(input: $input) { id } } } """ self.opencti.query( query, { "id": id, "input": { "toId": kill_chain_phase_id, "relationship_type": "kill-chain-phase", }, }, ) return True else: self.opencti.app_logger.error( "Missing parameters: id and kill_chain_phase_id" ) return False
[docs] def remove_kill_chain_phase(self, **kwargs): """Remove a Kill-Chain-Phase object from Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :param kill_chain_phase_id: the id of the Kill-Chain-Phase :type kill_chain_phase_id: str :return: True if successful, False otherwise :rtype: bool """ id = kwargs.get("id", None) kill_chain_phase_id = kwargs.get("kill_chain_phase_id", None) if id is not None and kill_chain_phase_id is not None: self.opencti.app_logger.info( "Removing Kill-Chain-Phase from Stix-Domain-Object", {"kill_chain_phase_id": kill_chain_phase_id, "id": id}, ) query = """ mutation StixDomainObjectRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) { stixDomainObjectEdit(id: $id) { relationDelete(toId: $toId, relationship_type: $relationship_type) { id } } } """ self.opencti.query( query, { "id": id, "toId": kill_chain_phase_id, "relationship_type": "kill-chain-phase", }, ) return True else: self.opencti.app_logger.error( "[stix_domain_object] Missing parameters: id and kill_chain_phase_id" ) return False
[docs] def reports(self, **kwargs): """Get the reports about a Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :return: List of reports :rtype: list or None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info( "Getting reports of the Stix-Domain-Object", {"id": id} ) query = """ query StixDomainObject($id: String!) { stixDomainObject(id: $id) { reports { edges { node { id standard_id entity_type parent_types spec_version created_at updated_at createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified name description report_types published } } } } } """ result = self.opencti.query(query, {"id": id}) processed_result = self.opencti.process_multiple_fields( result["data"]["stixDomainObject"] ) if processed_result: return processed_result["reports"] else: return [] else: self.opencti.app_logger.error("Missing parameters: id") return None
[docs] def notes(self, **kwargs): """Get the notes about a Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :return: List of notes :rtype: list or None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info( "Getting notes of the Stix-Domain-Object", {"id": id} ) query = """ query StixDomainObject($id: String!) { stixDomainObject(id: $id) { notes { edges { node { id standard_id entity_type parent_types spec_version created_at updated_at createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified attribute_abstract content authors note_types likelihood } } } } } """ result = self.opencti.query(query, {"id": id}) processed_result = self.opencti.process_multiple_fields( result["data"]["stixDomainObject"] ) if processed_result: return processed_result["notes"] else: return [] else: self.opencti.app_logger.error("Missing parameters: id") return None
[docs] def observed_data(self, **kwargs): """Get the observed data of a Stix-Domain-Object object. :param id: the id of the Stix-Domain-Object :type id: str :return: List of observed data :rtype: list or None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info( "Getting Observed-Data of the Stix-Domain-Object", {"id": id} ) query = """ query StixDomainObject($id: String!) { stixDomainObject(id: $id) { observedData { edges { node { id standard_id entity_type parent_types spec_version created_at updated_at createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified first_observed last_observed number_observed importFiles { edges { node { id name size metaData { mimetype version } } } } } } } } } """ result = self.opencti.query(query, {"id": id}) processed_result = self.opencti.process_multiple_fields( result["data"]["stixDomainObject"] ) if processed_result: return processed_result["observedData"] else: return [] else: self.opencti.app_logger.error("Missing parameters: id") return None