Source code for pycti.entities.opencti_external_reference

# coding: utf-8

import json
import os
import uuid

import magic
from stix2.canonicalization.Canonicalize import canonicalize


[docs] class ExternalReference: """Main ExternalReference class for OpenCTI Manages external references and citations in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the ExternalReference instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types created_at updated_at created modified source_name description url hash external_id """
[docs] self.properties_with_files = """ id standard_id entity_type parent_types created_at updated_at created modified source_name description url hash external_id importFiles { edges { node { id name size metaData { mimetype version } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } } } } """
@staticmethod
[docs] def generate_id(url=None, source_name=None, external_id=None): """Generate a STIX ID for an External Reference. :param url: The URL of the external reference :type url: str or None :param source_name: The source name :type source_name: str or None :param external_id: The external ID :type external_id: str or None :return: STIX ID for the external reference, or None if insufficient parameters :rtype: str or None """ if url is not None: data = {"url": url} elif source_name is not None and external_id is not None: data = {"source_name": source_name, "external_id": external_id} else: return None data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) return "external-reference--" + id
@staticmethod
[docs] def generate_id_from_data(data): """Generate a STIX ID from external reference data. :param data: Dictionary containing 'url', 'source_name', or 'external_id' keys :type data: dict :return: STIX ID for the external reference :rtype: str or None """ return ExternalReference.generate_id( data.get("url"), data.get("source_name"), data.get("external_id") )
[docs] def list(self, **kwargs): """List External-Reference objects. :param filters: the filters to apply :type filters: dict :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :param orderBy: field to order results by :type orderBy: str :param orderMode: ordering mode (asc/desc) :type orderMode: str :param customAttributes: custom attributes to return :type customAttributes: list :param getAll: whether to retrieve all results :type getAll: bool :param withPagination: whether to include pagination info :type withPagination: bool :param withFiles: whether to include files :type withFiles: bool :return: List of External-Reference objects :rtype: list """ filters = kwargs.get("filters", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) with_files = kwargs.get("withFiles", False) self.opencti.app_logger.info( "Listing External-Reference with filters", {"filters": json.dumps(filters)} ) query = ( """ query ExternalReferences($filters: FilterGroup, $first: Int, $after: ID, $orderBy: ExternalReferencesOrdering, $orderMode: OrderingMode) { externalReferences(filters: $filters, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["externalReferences"]) final_data = final_data + data while result["data"]["externalReferences"]["pageInfo"]["hasNextPage"]: after = result["data"]["externalReferences"]["pageInfo"]["endCursor"] self.opencti.app_logger.debug( "Listing External-References", {"after": after} ) result = self.opencti.query( query, { "filters": filters, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple( result["data"]["externalReferences"] ) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["externalReferences"], with_pagination )
[docs] def read(self, **kwargs): """Read an External-Reference object. :param id: the id of the External-Reference :type id: str :param filters: the filters to apply if no id provided :type filters: dict :return: External-Reference object :rtype: dict or None """ id = kwargs.get("id", None) filters = kwargs.get("filters", None) if id is not None: self.opencti.app_logger.info("Reading External-Reference", {"id": id}) query = ( """ query ExternalReference($id: String!) { externalReference(id: $id) { """ + self.properties + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["externalReference"] ) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_external_reference] Missing parameters: id or filters" ) return None
[docs] def create(self, **kwargs): """Create an External Reference object. :param stix_id: (optional) the STIX ID :type stix_id: str :param created: (optional) creation date :type created: datetime :param modified: (optional) modification date :type modified: datetime :param source_name: the source name of the External Reference (required if no url) :type source_name: str :param url: (optional) the URL of the external reference (required if no source_name) :type url: str :param external_id: (optional) the external ID :type external_id: str :param description: (optional) description :type description: str :param x_opencti_stix_ids: (optional) list of additional STIX IDs :type x_opencti_stix_ids: list :param update: (optional) whether to update if exists (default: False) :type update: bool :param files: (optional) list of File objects to attach :type files: list :param filesMarkings: (optional) list of lists of marking definition IDs for each file :type filesMarkings: list :return: External Reference object :rtype: dict or None """ stix_id = kwargs.get("stix_id", None) created = kwargs.get("created", None) modified = kwargs.get("modified", None) source_name = kwargs.get("source_name", None) url = kwargs.get("url", None) external_id = kwargs.get("external_id", None) description = kwargs.get("description", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) update = kwargs.get("update", False) files = kwargs.get("files", None) files_markings = kwargs.get("filesMarkings", None) no_trigger_import = kwargs.get("noTriggerImport", None) embedded = kwargs.get("embedded", None) if source_name is not None or url is not None: self.opencti.app_logger.info( "Creating External Reference", {"source_name": source_name} ) query = ( """ mutation ExternalReferenceAdd($input: ExternalReferenceAddInput!) { externalReferenceAdd(input: $input) { """ + self.properties + """ } } """ ) input_variables = { "stix_id": stix_id, "created": created, "modified": modified, "source_name": source_name, "external_id": external_id, "description": description, "url": url, "x_opencti_stix_ids": x_opencti_stix_ids, "update": update, "files": files, "filesMarkings": files_markings, "noTriggerImport": no_trigger_import, "embedded": embedded, } result = self.opencti.query(query, {"input": input_variables}) return self.opencti.process_multiple_fields( result["data"]["externalReferenceAdd"] ) else: self.opencti.app_logger.error( "[opencti_external_reference] Missing parameters: source_name and url" ) return None
[docs] def add_file(self, **kwargs): """Upload a file in this External-Reference. :param id: the External-Reference id :type id: str :param file_name: the name of the file to upload :type file_name: str :param data: the file data (if None, reads from file_name path) :type data: bytes or None :param version: (optional) the file version date :type version: datetime :param fileMarkings: (optional) list of marking definition IDs for the file :type fileMarkings: list :param mime_type: (optional) MIME type (default: text/plain) :type mime_type: str :param no_trigger_import: (optional) don't trigger import (default: False) :type no_trigger_import: bool :param embedded: (optional) embed the file (default: False) :type embedded: bool :return: File upload result :rtype: dict or None """ id = kwargs.get("id", None) file_name = kwargs.get("file_name", None) data = kwargs.get("data", None) version = kwargs.get("version", None) file_markings = kwargs.get("fileMarkings", None) mime_type = kwargs.get("mime_type", "text/plain") no_trigger_import = kwargs.get("no_trigger_import", False) embedded = kwargs.get("embedded", False) if id is not None and file_name is not None: final_file_name = os.path.basename(file_name) query = """ mutation ExternalReferenceEdit($id: ID!, $file: Upload!, $fileMarkings: [String], $version: DateTime, $noTriggerImport: Boolean, $embedded: Boolean) { externalReferenceEdit(id: $id) { importPush(file: $file, fileMarkings: $fileMarkings, version: $version, noTriggerImport: $noTriggerImport, embedded: $embedded) { id name } } } """ if data is None: data = open(file_name, "rb") if file_name.endswith(".json"): mime_type = "application/json" else: mime_type = magic.from_file(file_name, mime=True) self.opencti.app_logger.info( "Uploading a file in External-Reference", {"file": final_file_name, "id": id}, ) return self.opencti.query( query, { "id": id, "file": (self.opencti.file(final_file_name, data, mime_type)), "fileMarkings": file_markings, "version": version, "noTriggerImport": ( no_trigger_import if isinstance(no_trigger_import, bool) else no_trigger_import == "True" ), "embedded": embedded, }, ) else: self.opencti.app_logger.error( "[opencti_external_reference] Missing parameters: id or file_name" ) return None
[docs] def update_field(self, **kwargs): """Update an External Reference object field. :param id: the External Reference id :type id: str :param input: the input of the field :type input: list :return: The updated External Reference object :rtype: dict or None """ id = kwargs.get("id", None) input = kwargs.get("input", None) if id is not None and input is not None: self.opencti.app_logger.info("Updating External-Reference", {"id": id}) query = """ mutation ExternalReferenceEdit($id: ID!, $input: [EditInput]!) { externalReferenceEdit(id: $id) { fieldPatch(input: $input) { id } } } """ result = self.opencti.query(query, {"id": id, "input": input}) return self.opencti.process_multiple_fields( result["data"]["externalReferenceEdit"]["fieldPatch"] ) else: self.opencti.app_logger.error( "[opencti_external_reference] Missing parameters: id and key and value" ) return None
[docs] def delete(self, id): """Delete an External-Reference object. :param id: the id of the External-Reference to delete :type id: str :return: None """ self.opencti.app_logger.info("Deleting External-Reference", {"id": id}) query = """ mutation ExternalReferenceEdit($id: ID!) { externalReferenceEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id})
[docs] def list_files(self, **kwargs): """List files attached to an External-Reference. :param id: the id of the External-Reference :type id: str :return: List of files :rtype: list """ id = kwargs.get("id", None) self.opencti.app_logger.debug("Listing files of External-Reference", {"id": id}) query = """ query externalReference($id: String!) { externalReference(id: $id) { importFiles { edges { node { id name size metaData { mimetype version } } } } } } """ result = self.opencti.query(query, {"id": id}) entity = self.opencti.process_multiple_fields( result["data"]["externalReference"] ) return entity["importFiles"]