Source code for pycti.entities.opencti_stix_core_object

# coding: utf-8
import json


[docs] class StixCoreObject: """Main StixCoreObject class for OpenCTI Base class for managing STIX core objects in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """ def __init__(self, opencti): """Initialize the StixCoreObject instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types spec_version created_at updated_at objectOrganization { id standard_id name } createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified } } } ... on StixDomainObject { revoked confidence created modified } ... on AttackPattern { name description aliases x_mitre_platforms x_mitre_permissions_required x_mitre_detection x_mitre_id killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Campaign { name description aliases first_seen last_seen objective } ... on Note { attribute_abstract content authors note_types likelihood } ... on ObservedData { first_observed last_observed number_observed } ... on Opinion { explanation authors opinion } ... on Report { name description report_types published } ... on Grouping { name description context objects { edges { node { ... on BasicObject { id entity_type standard_id } ... on BasicRelationship { id entity_type standard_id } } } } } ... on CourseOfAction { name description x_opencti_aliases } ... on DataComponent { name description dataSource { id standard_id entity_type parent_types spec_version created_at updated_at revoked confidence created modified name description x_mitre_platforms collection_layers } } ... on DataSource { name description x_mitre_platforms collection_layers } ... on Individual { name description contact_information x_opencti_aliases x_opencti_firstname x_opencti_lastname } ... on Organization { name description contact_information x_opencti_aliases x_opencti_organization_type x_opencti_reliability } ... on Sector { name description contact_information x_opencti_aliases } ... on System { name description contact_information x_opencti_aliases } ... on Indicator { pattern_type pattern_version pattern name description indicator_types valid_from valid_until x_opencti_score x_opencti_detection x_opencti_main_observable_type } ... on Infrastructure { name description aliases infrastructure_types first_seen last_seen } ... on IntrusionSet { name description aliases first_seen last_seen goals resource_level primary_motivation secondary_motivations } ... on City { name description latitude longitude precision x_opencti_aliases } ... on Country { name description latitude longitude precision x_opencti_aliases } ... on Region { name description latitude longitude precision x_opencti_aliases } ... on Position { name description latitude longitude precision x_opencti_aliases street_address postal_code } ... on Malware { name description aliases malware_types is_family first_seen last_seen architecture_execution_envs implementation_languages capabilities killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on MalwareAnalysis { product version configuration_version modules analysis_engine_version analysis_definition_version submitted analysis_started analysis_ended result_name result } ... on ThreatActor { name description aliases threat_actor_types first_seen last_seen roles goals sophistication resource_level primary_motivation secondary_motivations personal_motivations } ... on Tool { name description aliases tool_types tool_version killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Vulnerability { name description x_opencti_aliases x_opencti_cvss_vector_string x_opencti_cvss_base_score x_opencti_cvss_base_severity x_opencti_cvss_attack_vector x_opencti_cvss_attack_complexity x_opencti_cvss_privileges_required x_opencti_cvss_user_interaction x_opencti_cvss_scope x_opencti_cvss_confidentiality_impact x_opencti_cvss_integrity_impact x_opencti_cvss_availability_impact x_opencti_cvss_exploit_code_maturity x_opencti_cvss_remediation_level x_opencti_cvss_report_confidence x_opencti_cvss_temporal_score x_opencti_cvss_v2_vector_string x_opencti_cvss_v2_base_score x_opencti_cvss_v2_access_vector x_opencti_cvss_v2_access_complexity x_opencti_cvss_v2_authentication x_opencti_cvss_v2_confidentiality_impact x_opencti_cvss_v2_integrity_impact x_opencti_cvss_v2_availability_impact x_opencti_cvss_v2_exploitability x_opencti_cvss_v2_remediation_level x_opencti_cvss_v2_report_confidence x_opencti_cvss_v2_temporal_score x_opencti_cvss_v4_vector_string x_opencti_cvss_v4_base_score x_opencti_cvss_v4_base_severity x_opencti_cvss_v4_attack_vector x_opencti_cvss_v4_attack_complexity x_opencti_cvss_v4_attack_requirements x_opencti_cvss_v4_privileges_required x_opencti_cvss_v4_user_interaction x_opencti_cvss_v4_confidentiality_impact_v x_opencti_cvss_v4_confidentiality_impact_s x_opencti_cvss_v4_integrity_impact_v x_opencti_cvss_v4_integrity_impact_s x_opencti_cvss_v4_availability_impact_v x_opencti_cvss_v4_availability_impact_s x_opencti_cvss_v4_exploit_maturity x_opencti_cwe x_opencti_cisa_kev x_opencti_epss_score x_opencti_epss_percentile x_opencti_score } ... on Incident { name description aliases first_seen last_seen objective } ... on Event { name description } ... on Channel { name description aliases channel_types } ... on Narrative { name description aliases narrative_types } ... on Case { name description objects { edges { node { ... on BasicObject { id entity_type standard_id } ... on BasicRelationship { id entity_type standard_id } } } } } ... on Feedback { name description objects { edges { node { ... on BasicObject { id entity_type standard_id } ... on BasicRelationship { id entity_type standard_id } } } } } ... on StixCyberObservable { observable_value indicators { edges { node { id pattern pattern_type } } } } ... on AutonomousSystem { number name_alt: name rir } ... on Directory { path path_enc ctime mtime atime } ... on DomainName { value } ... on EmailAddr { value display_name } ... on EmailMessage { is_multipart attribute_date content_type message_id subject received_lines body } ... on Artifact { mime_type payload_bin url encryption_algorithm decryption_key hashes { algorithm hash } importFiles { edges { node { id name size } } } } ... on StixFile { extensions size name_alt: name name_enc magic_number_hex mime_type ctime mtime atime x_opencti_additional_names hashes { algorithm hash } } ... on X509Certificate { is_self_signed version serial_number signature_algorithm issuer subject subject_public_key_algorithm subject_public_key_modulus subject_public_key_exponent validity_not_before validity_not_after hashes { algorithm hash } basic_constraints name_constraints policy_constraints key_usage extended_key_usage subject_key_identifier authority_key_identifier subject_alternative_name issuer_alternative_name subject_directory_attributes crl_distribution_points inhibit_any_policy private_key_usage_period_not_before private_key_usage_period_not_after certificate_policies policy_mappings } ... on IPv4Addr { value } ... on IPv6Addr { value } ... on MacAddr { value } ... on Mutex { name_alt: name } ... on NetworkTraffic { extensions start end is_active src_port dst_port protocols src_byte_count dst_byte_count src_packets dst_packets } ... on Process { extensions is_hidden pid created_time cwd command_line environment_variables } ... on Software { name_alt: name cpe swid languages vendor version } ... on Url { value } ... on UserAccount { extensions user_id credential account_login account_type display_name is_service_account is_privileged can_escalate_privs is_disabled account_created account_expires credential_last_changed account_first_login account_last_login } ... on WindowsRegistryKey { attribute_key modified_time number_of_subkeys } ... on WindowsRegistryValueType { name_alt: name data data_type } ... on CryptographicKey { value } ... on CryptocurrencyWallet { value } ... on Hostname { value } ... on Text { value } ... on UserAgent { value } ... on BankAccount { iban bic account_number } ... on PhoneNumber { value } ... on TrackingNumber { value } ... on Credential { value } ... on PaymentCard { card_number expiration_date cvv holder_name } ... on Persona { persona_name persona_type } ... on MediaContent { title content_alt: content media_category url publication_date } """
[docs] self.properties_with_files = """ id standard_id entity_type parent_types spec_version created_at updated_at objectOrganization { id standard_id name } createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } ... on StixDomainObject { revoked confidence created modified } importFiles { edges { node { id name size metaData { mimetype version } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } } } } ... on AttackPattern { name description aliases x_mitre_platforms x_mitre_permissions_required x_mitre_detection x_mitre_id killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Campaign { name description aliases first_seen last_seen objective } ... on Note { attribute_abstract content authors note_types likelihood } ... on ObservedData { first_observed last_observed number_observed } ... on Opinion { explanation authors opinion } ... on Report { name description report_types published } ... on Grouping { name description context objects { edges { node { ... on BasicObject { id entity_type standard_id } ... on BasicRelationship { id entity_type standard_id } } } } } ... on CourseOfAction { name description x_opencti_aliases } ... on DataComponent { name description dataSource { id standard_id entity_type parent_types spec_version created_at updated_at revoked confidence created modified name description x_mitre_platforms collection_layers } } ... on DataSource { name description x_mitre_platforms collection_layers } ... on Individual { name description contact_information x_opencti_aliases x_opencti_firstname x_opencti_lastname } ... on Organization { name description contact_information x_opencti_aliases x_opencti_organization_type x_opencti_reliability } ... on Sector { name description contact_information x_opencti_aliases } ... on System { name description contact_information x_opencti_aliases } ... on Indicator { pattern_type pattern_version pattern name description indicator_types valid_from valid_until x_opencti_score x_opencti_detection x_opencti_main_observable_type } ... on Infrastructure { name description aliases infrastructure_types first_seen last_seen } ... on IntrusionSet { name description aliases first_seen last_seen goals resource_level primary_motivation secondary_motivations } ... on City { name description latitude longitude precision x_opencti_aliases } ... on Country { name description latitude longitude precision x_opencti_aliases } ... on Region { name description latitude longitude precision x_opencti_aliases } ... on Position { name description latitude longitude precision x_opencti_aliases street_address postal_code } ... on Malware { name description aliases malware_types is_family first_seen last_seen architecture_execution_envs implementation_languages capabilities killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on MalwareAnalysis { product version configuration_version modules analysis_engine_version analysis_definition_version submitted analysis_started analysis_ended result_name result } ... on ThreatActor { name description aliases threat_actor_types first_seen last_seen roles goals sophistication resource_level primary_motivation secondary_motivations personal_motivations } ... on Tool { name description aliases tool_types tool_version killChainPhases { id standard_id entity_type kill_chain_name phase_name x_opencti_order created modified } } ... on Vulnerability { name description x_opencti_aliases x_opencti_cvss_vector_string x_opencti_cvss_base_score x_opencti_cvss_base_severity x_opencti_cvss_attack_vector x_opencti_cvss_attack_complexity x_opencti_cvss_privileges_required x_opencti_cvss_user_interaction x_opencti_cvss_scope x_opencti_cvss_confidentiality_impact x_opencti_cvss_integrity_impact x_opencti_cvss_availability_impact x_opencti_cvss_exploit_code_maturity x_opencti_cvss_remediation_level x_opencti_cvss_report_confidence x_opencti_cvss_temporal_score x_opencti_cvss_v2_vector_string x_opencti_cvss_v2_base_score x_opencti_cvss_v2_access_vector x_opencti_cvss_v2_access_complexity x_opencti_cvss_v2_authentication x_opencti_cvss_v2_confidentiality_impact x_opencti_cvss_v2_integrity_impact x_opencti_cvss_v2_availability_impact x_opencti_cvss_v2_exploitability x_opencti_cvss_v2_remediation_level x_opencti_cvss_v2_report_confidence x_opencti_cvss_v2_temporal_score x_opencti_cvss_v4_vector_string x_opencti_cvss_v4_base_score x_opencti_cvss_v4_base_severity x_opencti_cvss_v4_attack_vector x_opencti_cvss_v4_attack_complexity x_opencti_cvss_v4_attack_requirements x_opencti_cvss_v4_privileges_required x_opencti_cvss_v4_user_interaction x_opencti_cvss_v4_confidentiality_impact_v x_opencti_cvss_v4_confidentiality_impact_s x_opencti_cvss_v4_integrity_impact_v x_opencti_cvss_v4_integrity_impact_s x_opencti_cvss_v4_availability_impact_v x_opencti_cvss_v4_availability_impact_s x_opencti_cvss_v4_exploit_maturity x_opencti_cwe x_opencti_cisa_kev x_opencti_epss_score x_opencti_epss_percentile x_opencti_score } ... on Incident { name description aliases first_seen last_seen objective } ... on Event { name description } ... on Channel { name description aliases channel_types } ... on Narrative { name description aliases narrative_types } ... on Case { name description objects { edges { node { ... on BasicObject { id entity_type standard_id } ... on BasicRelationship { id entity_type standard_id } } } } } ... on Feedback { name description objects { edges { node { ... on BasicObject { id entity_type standard_id } ... on BasicRelationship { id entity_type standard_id } } } } } ... on StixCyberObservable { observable_value indicators { edges { node { id pattern pattern_type } } } } ... on AutonomousSystem { number name_alt: name rir } ... on Directory { path path_enc ctime mtime atime } ... on DomainName { value } ... on EmailAddr { value display_name } ... on EmailMessage { is_multipart attribute_date content_type message_id subject received_lines body } ... on Artifact { mime_type payload_bin url encryption_algorithm decryption_key hashes { algorithm hash } importFiles { edges { node { id name size } } } } ... on StixFile { extensions size name_alt: name name_enc magic_number_hex mime_type ctime mtime atime x_opencti_additional_names hashes { algorithm hash } } ... on X509Certificate { is_self_signed version serial_number signature_algorithm issuer subject subject_public_key_algorithm subject_public_key_modulus subject_public_key_exponent validity_not_before validity_not_after hashes { algorithm hash } basic_constraints name_constraints policy_constraints key_usage extended_key_usage subject_key_identifier authority_key_identifier subject_alternative_name issuer_alternative_name subject_directory_attributes crl_distribution_points inhibit_any_policy private_key_usage_period_not_before private_key_usage_period_not_after certificate_policies policy_mappings } ... on IPv4Addr { value } ... on IPv6Addr { value } ... on MacAddr { value } ... on Mutex { name_alt: name } ... on NetworkTraffic { extensions start end is_active src_port dst_port protocols src_byte_count dst_byte_count src_packets dst_packets } ... on Process { extensions is_hidden pid created_time cwd command_line environment_variables } ... on Software { name_alt: name cpe swid languages vendor version } ... on Url { value } ... on UserAccount { extensions user_id credential account_login account_type display_name is_service_account is_privileged can_escalate_privs is_disabled account_created account_expires credential_last_changed account_first_login account_last_login } ... on WindowsRegistryKey { attribute_key modified_time number_of_subkeys } ... on WindowsRegistryValueType { name_alt: name data data_type } ... on CryptographicKey { value } ... on CryptocurrencyWallet { value } ... on Hostname { value } ... on Text { value } ... on UserAgent { value } ... on BankAccount { iban bic account_number } ... on PhoneNumber { value } ... on TrackingNumber { value } ... on Credential { value } ... on PaymentCard { card_number expiration_date cvv holder_name } ... on Persona { persona_name persona_type } ... on MediaContent { title content_alt: content media_category url publication_date } """
[docs] def list(self, **kwargs): """List Stix-Core-Object objects. :param types: the list of types :type types: list :param filters: the filters to apply :type filters: dict :param search: the search keyword :type search: str :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :param orderBy: field to order results by :type orderBy: str :param orderMode: ordering mode (asc/desc) :type orderMode: str :param customAttributes: custom attributes to return :type customAttributes: str :param getAll: whether to retrieve all results :type getAll: bool :param withPagination: whether to include pagination info :type withPagination: bool :param withFiles: whether to include files :type withFiles: bool :return: List of Stix-Core-Object objects :rtype: list """ types = kwargs.get("types", None) filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 100) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) with_files = kwargs.get("withFiles", False) self.opencti.app_logger.info( "Listing Stix-Core-Objects with filters", {"filters": json.dumps(filters)} ) query = ( """ query StixCoreObjects($types: [String], $filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: StixCoreObjectsOrdering, $orderMode: OrderingMode) { stixCoreObjects(types: $types, filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "types": types, "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["stixCoreObjects"]) final_data = final_data + data while result["data"]["stixCoreObjects"]["pageInfo"]["hasNextPage"]: after = result["data"]["stixCoreObjects"]["pageInfo"]["endCursor"] self.opencti.app_logger.debug( "Listing Stix-Core-Objects", {"after": after} ) result = self.opencti.query( query, { "types": types, "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple(result["data"]["stixCoreObjects"]) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["stixCoreObjects"], with_pagination )
[docs] def read(self, **kwargs): """Read a Stix-Core-Object object. :param id: the id of the Stix-Core-Object :type id: str :param types: list of Stix Core Entity types :type types: list :param filters: the filters to apply if no id provided :type filters: dict :param customAttributes: custom attributes to return :type customAttributes: str :param withFiles: whether to include files :type withFiles: bool :return: Stix-Core-Object object :rtype: dict or None """ id = kwargs.get("id", None) types = kwargs.get("types", None) filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) with_files = kwargs.get("withFiles", False) if id is not None: self.opencti.app_logger.info("Reading Stix-Core-Object", {"id": id}) query = ( """ query StixCoreObject($id: String!) { stixCoreObject(id: $id) { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["stixCoreObject"] ) elif filters is not None: result = self.list( types=types, filters=filters, customAttributes=custom_attributes ) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_stix_core_object] Missing parameters: id or filters" ) return None
[docs] def list_files(self, **kwargs): """List files of a Stix-Core-Object. :param id: the id of the Stix-Core-Object :type id: str :return: List of files associated with the object :rtype: list """ id = kwargs.get("id", None) self.opencti.app_logger.debug("Listing files of Stix-Core-Object", {"id": id}) query = """ query StixCoreObject($id: String!) { stixCoreObject(id: $id) { importFiles { edges { node { id name size metaData { mimetype version } } } } } } """ result = self.opencti.query(query, {"id": id}) entity = self.opencti.process_multiple_fields(result["data"]["stixCoreObject"]) return entity["importFiles"]
[docs] def push_list_export( self, entity_id, entity_type, file_name, file_markings, data, list_filters="", mime_type=None, ): """Push a list export file for Stix-Core-Objects. :param entity_id: the id of the entity (optional) :type entity_id: str or None :param entity_type: the type of the entity :type entity_type: str :param file_name: the name of the file to export :type file_name: str :param file_markings: list of marking definition ids to apply :type file_markings: list :param data: the data content to export :type data: str :param list_filters: filters to apply on the list (default: "") :type list_filters: str :param mime_type: the MIME type of the file (optional) :type mime_type: str or None """ query = """ mutation StixCoreObjectsExportPush($entity_id: String, $entity_type: String!, $file: Upload!, $file_markings: [String]!, $listFilters: String) { stixCoreObjectsExportPush(entity_id: $entity_id, entity_type: $entity_type, file: $file, file_markings: $file_markings, listFilters: $listFilters) } """ if mime_type is None: file = self.opencti.file(file_name, data) else: file = self.opencti.file(file_name, data, mime_type) self.opencti.query( query, { "entity_id": entity_id, "entity_type": entity_type, "file": file, "file_markings": file_markings, "listFilters": list_filters, }, )
[docs] def push_analysis( self, entity_id, file_name, data, content_source, content_type, analysis_type, ): """Push an analysis file for a Stix-Core-Object. :param entity_id: the id of the Stix-Core-Object :type entity_id: str :param file_name: the name of the analysis file :type file_name: str :param data: the analysis data content :type data: str :param content_source: the source of the content :type content_source: str :param content_type: the type of analysis content :type content_type: str :param analysis_type: the type of analysis :type analysis_type: str """ query = """ mutation StixCoreObjectEdit( $id: ID!, $file: Upload!, $contentSource: String!, $contentType: AnalysisContentType!, $analysisType: String! ) { stixCoreObjectEdit(id: $id) { analysisPush(file: $file,contentSource: $contentSource,contentType: $contentType,analysisType: $analysisType){ id name } } } """ file = self.opencti.file(file_name, data) self.opencti.query( query, { "id": entity_id, "file": file, "contentSource": content_source, "contentType": content_type, "analysisType": analysis_type, }, )
[docs] def reports(self, **kwargs): """Get the reports about a Stix-Core-Object object. :param id: the id of the Stix-Core-Object :type id: str :return: List of reports :rtype: list or None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info( "Getting reports of the Stix-Core-Object", {"id": id} ) query = """ query StixCoreObject($id: String!) { stixCoreObject(id: $id) { reports { edges { node { id standard_id entity_type parent_types spec_version created_at updated_at createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified name description report_types published } } } } } """ result = self.opencti.query(query, {"id": id}) processed_result = self.opencti.process_multiple_fields( result["data"]["stixCoreObject"] ) if processed_result: return processed_result["reports"] else: return [] else: self.opencti.app_logger.error("Missing parameters: id") return None
[docs] def rule_apply(self, **kwargs): """Apply rule to Stix-Core-Object object. :param element_id: the Stix-Core-Object id :type element_id: str :param rule_id: the rule to apply :type rule_id: str """ rule_id = kwargs.get("rule_id", None) element_id = kwargs.get("element_id", None) if element_id is not None and rule_id is not None: self.opencti.app_logger.info( "Apply rule stix_core_object", {"id": element_id} ) query = """ mutation StixCoreApplyRule($elementId: ID!, $ruleId: ID!) { ruleApply(elementId: $elementId, ruleId: $ruleId) } """ self.opencti.query(query, {"elementId": element_id, "ruleId": rule_id}) else: self.opencti.app_logger.error( "[stix_core_object] Cannot apply rule, missing parameters: id" ) return None
[docs] def rule_apply_async(self, **kwargs): """Apply rule to Stix-Core-Object object. :param element_id: the Stix-Core-Object id :type element_id: str :param rule_id: the rule to apply :type rule_id: str """ rule_id = kwargs.get("rule_id", None) element_id = kwargs.get("element_id", None) execution_id = kwargs.get("execution_id", None) rule_apply_complete = False if element_id is not None and rule_id is not None and execution_id is not None: while not rule_apply_complete: self.opencti.app_logger.info( "Apply rule async stix_core_object", {"id": element_id, "execution_id": execution_id}, ) query = """ mutation StixCoreApplyRule($elementId: ID!, $ruleId: ID!, $executionId: ID!) { ruleApplyAsync(elementId: $elementId, ruleId: $ruleId, executionId: $executionId) } """ result = self.opencti.query( query, { "elementId": element_id, "ruleId": rule_id, "executionId": execution_id, }, ) rule_apply_complete = result["data"]["ruleApplyAsync"] self.opencti.app_logger.info( "Apply rule async stix_core_object complete", {"id": element_id} ) else: self.opencti.app_logger.error( "[stix_core_object] Cannot apply rule, missing parameters: id" ) return None
[docs] def rule_clear(self, **kwargs): """Apply rule clear to Stix-Core-Object object. :param element_id: the Stix-Core-Object id :type element_id: str :param rule_id: the rule to clear :type rule_id: str """ rule_id = kwargs.get("rule_id", None) element_id = kwargs.get("element_id", None) if element_id is not None and rule_id is not None: self.opencti.app_logger.info( "Apply rule clear stix_core_object", {"id": element_id} ) query = """ mutation StixCoreClearRule($elementId: ID!, $ruleId: ID!) { ruleClear(elementId: $elementId, ruleId: $ruleId) } """ self.opencti.query(query, {"elementId": element_id, "ruleId": rule_id}) else: self.opencti.app_logger.error( "[stix_core_object] Cannot clear rule, missing parameters: id" ) return None
[docs] def rules_rescan(self, **kwargs): """Apply rules rescan to Stix-Core-Object object. :param element_id: the Stix-Core-Object id :type element_id: str """ element_id = kwargs.get("element_id", None) if element_id is not None: self.opencti.app_logger.info( "Apply rules rescan stix_core_object", {"id": element_id} ) query = """ mutation StixCoreRescanRules($elementId: ID!) { rulesRescan(elementId: $elementId) } """ self.opencti.query(query, {"elementId": element_id}) else: self.opencti.app_logger.error( "[stix_core_object] Cannot rescan rule, missing parameters: id" ) return None
[docs] def rule_rescan_async(self, **kwargs): """Apply rules rescan to Stix-Core-Object object. :param element_id: the Stix-Core-Object id :type element_id: str """ element_id = kwargs.get("element_id", None) execution_id = kwargs.get("execution_id", None) rule_rescan_complete = False if element_id is not None and execution_id is not None: while not rule_rescan_complete: self.opencti.app_logger.info( "Apply rules rescan stix_core_object", {"id": element_id} ) query = """ mutation StixCoreRescanRules($elementId: ID!) { rulesRescanAsync(elementId: $elementId, $executionId: ID!) } """ result = self.opencti.query( query, {"elementId": element_id, "executionId": execution_id} ) rule_rescan_complete = result["data"]["rulesRescanAsync"] self.opencti.app_logger.info( "Rescan rule async stix_core_object complete", {"id": element_id} ) else: self.opencti.app_logger.error( "[stix_core_object] Cannot rescan rule, missing parameters: id" ) return None
[docs] def clear_access_restriction(self, **kwargs): """Ask clear restriction on a Stix-Core-Object. :param element_id: the Stix-Core-Object id :type element_id: str """ element_id = kwargs.get("element_id", None) if element_id is not None: query = """ mutation StixCoreObjectEdit($id: ID!) { stixCoreObjectEdit(id: $id) { clearAccessRestriction { id } } } """ self.opencti.query( query, { "id": element_id, }, ) else: self.opencti.app_logger.error( "[stix_core_object] Cannot clear access restriction, missing parameters: id" ) return None
[docs] def ask_enrichment(self, **kwargs): """Ask enrichment with a single connector. :param element_id: the Stix-Core-Object id :type element_id: str :param connector_id: the connector id :type connector_id: str """ element_id = kwargs.get("element_id", None) connector_id = kwargs.get("connector_id", None) query = """ mutation StixCoreObjectEdit($id: ID!, $connectorId: ID!) { stixCoreObjectEdit(id: $id) { askEnrichment(connectorId: $connectorId) { id } } } """ self.opencti.query( query, { "id": element_id, "connectorId": connector_id, }, )
[docs] def ask_enrichments(self, **kwargs): """Ask enrichment with multiple connectors. :param element_id: the Stix-Core-Object id :type element_id: str :param connector_ids: list of connector ids :type connector_ids: list """ element_id = kwargs.get("element_id", None) connector_ids = kwargs.get("connector_ids", None) query = """ mutation StixCoreObjectEdit($id: ID!, $connectorIds: [ID!]!) { stixCoreObjectEdit(id: $id) { askEnrichments(connectorIds: $connectorIds) { id } } } """ self.opencti.query( query, { "id": element_id, "connectorIds": connector_ids, }, )
[docs] def organization_share(self, entity_id, organization_ids, sharing_direct_container): """Share element to multiple organizations. :param entity_id: the Stix-Core-Object id :type entity_id: str :param organization_ids: list of organization ids to share with :type organization_ids: list :param sharing_direct_container: whether to share direct containers :type sharing_direct_container: bool """ query = """ mutation StixCoreObjectEdit($id: ID!, $organizationId: [ID!]!, $directContainerSharing: Boolean) { stixCoreObjectEdit(id: $id) { restrictionOrganizationAdd(organizationId: $organizationId, directContainerSharing: $directContainerSharing) { id } } } """ self.opencti.query( query, { "id": entity_id, "organizationId": organization_ids, "directContainerSharing": sharing_direct_container, }, )
[docs] def organization_unshare( self, entity_id, organization_ids, sharing_direct_container ): """Unshare element from multiple organizations. :param entity_id: the Stix-Core-Object id :type entity_id: str :param organization_ids: list of organization ids to unshare from :type organization_ids: list :param sharing_direct_container: whether to unshare direct containers :type sharing_direct_container: bool """ query = """ mutation StixCoreObjectEdit($id: ID!, $organizationId: [ID!]!, $directContainerSharing: Boolean) { stixCoreObjectEdit(id: $id) { restrictionOrganizationDelete(organizationId: $organizationId, directContainerSharing: $directContainerSharing) { id } } } """ self.opencti.query( query, { "id": entity_id, "organizationId": organization_ids, "directContainerSharing": sharing_direct_container, }, )
[docs] def delete(self, **kwargs): """Delete a Stix-Core-Object object. :param id: the Stix-Core-Object id :type id: str """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Deleting stix_core_object", {"id": id}) query = """ mutation StixCoreObjectEdit($id: ID!) { stixCoreObjectEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error( "[opencti_stix_core_object] Missing parameters: id" ) return None
[docs] def remove_from_draft(self, **kwargs): """Remove a Stix-Core-Object object from draft (revert). :param id: the Stix-Core-Object id :type id: str """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Draft remove stix_core_object", {"id": id}) query = """ mutation StixCoreObjectEditDraftRemove($id: ID!) { stixCoreObjectEdit(id: $id) { removeFromDraft } } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error( "[stix_core_object] Cannot remove from draft, missing parameters: id" ) return None