Source code for pycti.entities.opencti_malware_analysis

# coding: utf-8

import json
import uuid

from stix2.canonicalization.Canonicalize import canonicalize


[docs] class MalwareAnalysis: """Main MalwareAnalysis class for OpenCTI Manages malware analysis reports and results in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the MalwareAnalysis instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types spec_version created_at updated_at status { id template { id name color } } createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectOrganization { id standard_id name } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified } } } revoked confidence created modified product result_name result submitted analysis_started analysis_ended version configuration_version analysis_engine_version analysis_definition_version modules """
[docs] self.properties_with_files = """ id standard_id entity_type parent_types spec_version created_at updated_at status { id template { id name color } } createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectOrganization { id standard_id name } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified product result_name result submitted analysis_started analysis_ended version configuration_version analysis_engine_version analysis_definition_version modules importFiles { edges { node { id name size metaData { mimetype version } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } } } } """
@staticmethod
[docs] def generate_id(result_name, product=None, submitted=None): """Generate a STIX ID for a Malware Analysis. :param result_name: the result name of the analysis :type result_name: str :param product: the product that performed the analysis (optional) :type product: str :param submitted: the submission date (optional) :type submitted: str :return: STIX ID for the Malware Analysis :rtype: str """ result_name = result_name.lower().strip() data = {"result_name": result_name, "product": product} if submitted is not None: data = {**data, "submitted": submitted} data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) return "malware-analysis--" + id
@staticmethod
[docs] def generate_id_from_data(data): """Generate a STIX ID from Malware Analysis data. :param data: Dictionary containing 'result_name', 'product', and optionally 'submitted' keys :type data: dict :return: STIX ID for the Malware Analysis :rtype: str """ return MalwareAnalysis.generate_id( data["result_name"], data["product"], data.get("submitted") )
[docs] def list(self, **kwargs): """List Malware analysis objects. :param filters: the filters to apply :type filters: dict :param search: the search keyword :type search: str :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :return: List of MalwareAnalysis objects :rtype: list """ filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) with_files = kwargs.get("withFiles", False) self.opencti.app_logger.info( "Listing Malware analyses with filters", {"filters": json.dumps(filters)} ) query = ( """ query MalwareAnalyses($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: MalwareAnalysesOrdering, $orderMode: OrderingMode) { malwareAnalyses(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["malwareAnalyses"]) final_data = final_data + data while result["data"]["malwareAnalyses"]["pageInfo"]["hasNextPage"]: after = result["data"]["malwareAnalyses"]["pageInfo"]["endCursor"] self.opencti.app_logger.debug( "Listing Malware analyses", {"after": after} ) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple(result["data"]["malwareAnalyses"]) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["malwareAnalyses"], with_pagination )
[docs] def read(self, **kwargs): """Read a Malware analysis object. :param id: the id of the Malware analysis :type id: str :param filters: the filters to apply if no id provided :type filters: dict :return: Malware analysis object :rtype: dict or None """ id = kwargs.get("id", None) filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) with_files = kwargs.get("withFiles", False) if id is not None: self.opencti.app_logger.info("Reading Malware analysis", {"id": id}) query = ( """ query MalwareAnalysis($id: String!) { malwareAnalysis(id: $id) { """ + ( custom_attributes if custom_attributes is not None else (self.properties_with_files if with_files else self.properties) ) + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["malwareAnalysis"] ) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_malware_analysis] Missing parameters: id or filters" ) return None
[docs] def create(self, **kwargs): """Create a Malware analysis object. :param product: the product that performed the analysis (required) :type product: str :param result_name: the result name of the analysis (required) :type result_name: str :param stix_id: (optional) the STIX ID :type stix_id: str :param createdBy: (optional) the author ID :type createdBy: str :param objectMarking: (optional) list of marking definition IDs :type objectMarking: list :param objectLabel: (optional) list of label IDs :type objectLabel: list :param externalReferences: (optional) list of external reference IDs :type externalReferences: list :param revoked: (optional) whether the malware analysis is revoked :type revoked: bool :param confidence: (optional) confidence level (0-100) :type confidence: int :param lang: (optional) language :type lang: str :param created: (optional) creation date :type created: str :param modified: (optional) modification date :type modified: str :param result: (optional) result of the analysis :type result: str :param submitted: (optional) submission date :type submitted: str :param analysis_started: (optional) analysis start date :type analysis_started: str :param analysis_ended: (optional) analysis end date :type analysis_ended: str :param version: (optional) version of the analysis :type version: str :param configuration_version: (optional) configuration version :type configuration_version: str :param analysis_engine_version: (optional) analysis engine version :type analysis_engine_version: str :param analysis_definition_version: (optional) analysis definition version :type analysis_definition_version: str :param modules: (optional) list of analysis modules :type modules: list :param hostVm: (optional) host VM reference ID :type hostVm: str :param operatingSystem: (optional) operating system reference ID :type operatingSystem: str :param installedSoftware: (optional) list of installed software reference IDs :type installedSoftware: list :param sample: (optional) sample reference ID :type sample: str :param analysisSco: (optional) list of analysis SCO reference IDs :type analysisSco: list :param x_opencti_stix_ids: (optional) list of additional STIX IDs :type x_opencti_stix_ids: list :param objectOrganization: (optional) list of organization IDs :type objectOrganization: list :param x_opencti_workflow_id: (optional) workflow ID :type x_opencti_workflow_id: str :param x_opencti_modified_at: (optional) custom modification date :type x_opencti_modified_at: str :param update: (optional) whether to update if exists (default: False) :type update: bool :param files: (optional) list of File objects to attach :type files: list :param filesMarkings: (optional) list of lists of marking definition IDs for each file :type filesMarkings: list :return: Malware analysis object :rtype: dict or None """ stix_id = kwargs.get("stix_id", None) created_by = kwargs.get("createdBy", None) object_marking = kwargs.get("objectMarking", None) object_label = kwargs.get("objectLabel", None) external_references = kwargs.get("externalReferences", None) revoked = kwargs.get("revoked", None) confidence = kwargs.get("confidence", None) lang = kwargs.get("lang", None) created = kwargs.get("created", None) modified = kwargs.get("modified", None) product = kwargs.get("product", None) result_name = kwargs.get("result_name", None) result = kwargs.get("result", None) submitted = kwargs.get("submitted", None) analysis_started = kwargs.get("analysis_started", None) analysis_ended = kwargs.get("analysis_ended", None) version = kwargs.get("version", None) configuration_version = kwargs.get("configuration_version", None) analysis_engine_version = kwargs.get("analysis_engine_version", None) analysis_definition_version = kwargs.get("analysis_definition_version", None) modules = kwargs.get("modules", None) hostVm = kwargs.get("hostVm", None) operatingSystem = kwargs.get("operatingSystem", None) installedSoftware = kwargs.get("installedSoftware", None) sample = kwargs.get("sample", None) analysisSco = kwargs.get("analysisSco", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) granted_refs = kwargs.get("objectOrganization", None) x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None) x_opencti_modified_at = kwargs.get("x_opencti_modified_at", None) update = kwargs.get("update", False) files = kwargs.get("files", None) files_markings = kwargs.get("filesMarkings", None) no_trigger_import = kwargs.get("noTriggerImport", None) embedded = kwargs.get("embedded", None) upsert_operations = kwargs.get("upsert_operations", None) if product is not None and result_name is not None: self.opencti.app_logger.info( "Creating Malware analysis", {"product": product} ) query = """ mutation MalwareAnalysisAdd($input: MalwareAnalysisAddInput!) { malwareAnalysisAdd(input: $input) { id standard_id entity_type parent_types } } """ result = self.opencti.query( query, { "input": { "stix_id": stix_id, "createdBy": created_by, "objectMarking": object_marking, "objectLabel": object_label, "objectOrganization": granted_refs, "externalReferences": external_references, "revoked": revoked, "confidence": confidence, "lang": lang, "created": created, "modified": modified, "product": product, "result_name": result_name, "result": result, "submitted": submitted, "analysis_started": analysis_started, "analysis_ended": analysis_ended, "version": version, "configuration_version": configuration_version, "analysis_engine_version": analysis_engine_version, "analysis_definition_version": analysis_definition_version, "modules": modules, "hostVm": hostVm, "operatingSystem": operatingSystem, "installedSoftware": installedSoftware, "analysisSample": sample, "analysisSco": analysisSco, "x_opencti_stix_ids": x_opencti_stix_ids, "x_opencti_workflow_id": x_opencti_workflow_id, "x_opencti_modified_at": x_opencti_modified_at, "update": update, "files": files, "filesMarkings": files_markings, "noTriggerImport": no_trigger_import, "embedded": embedded, "upsertOperations": upsert_operations, } }, ) return self.opencti.process_multiple_fields( result["data"]["malwareAnalysisAdd"] ) else: self.opencti.app_logger.error( "[opencti_malware_analysis] Missing parameters: product and result_name" ) return None
[docs] def import_from_stix2(self, **kwargs): """Import a Malware analysis object from a STIX2 object. :param stixObject: the Stix-Object Malware analysis :type stixObject: dict :param extras: additional parameters like created_by_id, object_marking_ids :type extras: dict :param update: whether to update existing object :type update: bool :return: Malware analysis object :rtype: dict or None """ stix_object = kwargs.get("stixObject", None) extras = kwargs.get("extras", {}) update = kwargs.get("update", False) if stix_object is not None: # Search in extensions if "x_opencti_stix_ids" not in stix_object: stix_object["x_opencti_stix_ids"] = ( self.opencti.get_attribute_in_extension("stix_ids", stix_object) ) if "x_opencti_granted_refs" not in stix_object: stix_object["x_opencti_granted_refs"] = ( self.opencti.get_attribute_in_extension("granted_refs", stix_object) ) if "x_opencti_workflow_id" not in stix_object: stix_object["x_opencti_workflow_id"] = ( self.opencti.get_attribute_in_extension("workflow_id", stix_object) ) if "x_opencti_modified_at" not in stix_object: stix_object["x_opencti_modified_at"] = ( self.opencti.get_attribute_in_extension("modified_at", stix_object) ) if "opencti_upsert_operations" not in stix_object: stix_object["opencti_upsert_operations"] = ( self.opencti.get_attribute_in_extension( "opencti_upsert_operations", stix_object ) ) return self.create( stix_id=stix_object["id"], createdBy=( extras["created_by_id"] if "created_by_id" in extras else None ), objectMarking=( extras["object_marking_ids"] if "object_marking_ids" in extras else None ), objectLabel=( extras["object_label_ids"] if "object_label_ids" in extras else None ), externalReferences=( extras["external_references_ids"] if "external_references_ids" in extras else None ), revoked=stix_object["revoked"] if "revoked" in stix_object else None, confidence=( stix_object["confidence"] if "confidence" in stix_object else None ), lang=stix_object["lang"] if "lang" in stix_object else None, created=stix_object["created"] if "created" in stix_object else None, modified=stix_object["modified"] if "modified" in stix_object else None, product=stix_object["product"] if "product" in stix_object else None, result_name=( stix_object["result_name"] if "result_name" in stix_object else None ), result=stix_object["result"] if "result" in stix_object else None, submitted=( stix_object["submitted"] if "submitted" in stix_object else None ), analysis_started=( stix_object["analysis_started"] if "analysis_started" in stix_object else None ), analysis_ended=( stix_object["analysis_ended"] if "analysis_ended" in stix_object else None ), version=stix_object["version"] if "version" in stix_object else None, configuration_version=( stix_object["configuration_version"] if "configuration_version" in stix_object else None ), analysis_engine_version=( stix_object["analysis_engine_version"] if "analysis_engine_version" in stix_object else None ), analysis_definition_version=( stix_object["analysis_definition_version"] if "analysis_definition_version" in stix_object else None ), operatingSystem=( stix_object["operating_system_ref"] if "operating_system_ref" in stix_object else None ), analysisSco=( stix_object["analysis_sco_refs"] if "analysis_sco_refs" in stix_object else None ), hostVm=( stix_object["host_vm_ref"] if "host_vm_ref" in stix_object else None ), installedSoftware=( stix_object["installed_software_refs"] if "installed_software_refs" in stix_object else None ), sample=( stix_object["sample_ref"] if "sample_ref" in stix_object else None ), modules=stix_object["modules"] if "modules" in stix_object else None, x_opencti_stix_ids=( stix_object["x_opencti_stix_ids"] if "x_opencti_stix_ids" in stix_object else None ), objectOrganization=( stix_object["x_opencti_granted_refs"] if "x_opencti_granted_refs" in stix_object else None ), x_opencti_workflow_id=( stix_object["x_opencti_workflow_id"] if "x_opencti_workflow_id" in stix_object else None ), x_opencti_modified_at=( stix_object["x_opencti_modified_at"] if "x_opencti_modified_at" in stix_object else None ), update=update, files=extras.get("files"), filesMarkings=extras.get("filesMarkings"), noTriggerImport=extras.get("noTriggerImport", None), embedded=extras.get("embedded", None), upsert_operations=( stix_object["opencti_upsert_operations"] if "opencti_upsert_operations" in stix_object else None ), ) else: self.opencti.app_logger.error( "[opencti_malware_analysis] Missing parameters: stixObject" ) return None