Source code for pycti.entities.opencti_security_coverage

# coding: utf-8

import json
import uuid

from stix2.canonicalization.Canonicalize import canonicalize


[docs] class SecurityCoverage: """Main SecurityCoverage class for OpenCTI Manages security coverage entities in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """ def __init__(self, opencti): """Initialize the SecurityCoverage instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types spec_version created_at updated_at external_uri objectCovered { __typename ... on StixCoreObject { id } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } """
@staticmethod
[docs] def generate_id(covered_ref): """Generate a STIX ID for a Security Coverage. :param covered_ref: The reference to the covered object :type covered_ref: str :return: STIX ID for the security coverage :rtype: str """ data = {"covered_ref": covered_ref.lower().strip()} data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) return "security-coverage--" + id
@staticmethod
[docs] def generate_id_from_data(data): """Generate a STIX ID from security coverage data. :param data: Dictionary containing 'covered_ref' key :type data: dict :return: STIX ID for the security coverage :rtype: str """ return SecurityCoverage.generate_id(data["covered_ref"])
[docs] def list(self, **kwargs): """List SecurityCoverage objects. :param filters: the filters to apply :param search: the search keyword :param first: return the first n rows from the after ID (or the beginning if not set) :param after: ID of the first row for pagination :return: List of SecurityCoverage objects :rtype: list """ filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 100) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) self.opencti.app_logger.info( "Listing SecurityCoverage with filters", {"filters": json.dumps(filters)} ) query = ( """ query SecurityCoverage($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityCoverageOrdering, $orderMode: OrderingMode) { securityCoverages(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + (custom_attributes if custom_attributes is not None else self.properties) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["securityCoverages"]) final_data = final_data + data while result["data"]["securityCoverages"]["pageInfo"]["hasNextPage"]: after = result["data"]["securityCoverages"]["pageInfo"]["endCursor"] self.opencti.app_logger.info( "Listing SecurityCoverage", {"after": after} ) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple( result["data"]["securityCoverages"] ) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["securityCoverages"], with_pagination )
[docs] def read(self, **kwargs): """Read a SecurityCoverage object. :param id: the id of the SecurityCoverage :type id: str :param filters: the filters to apply if no id provided :type filters: dict :param customAttributes: custom attributes to return :type customAttributes: str :return: SecurityCoverage object :rtype: dict or None """ id = kwargs.get("id", None) filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) if id is not None: self.opencti.app_logger.info("Reading SecurityCoverage", {"id": id}) query = ( """ query SecurityCoverage($id: String!) { securityCoverage(id: $id) { """ + ( custom_attributes if custom_attributes is not None else self.properties ) + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["securityCoverage"] ) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_security_coverage] Missing parameters: id or filters" ) return None
[docs] def create(self, **kwargs): """Create a Security coverage object. :param name: the name of the Security Coverage (required) :type name: str :param objectCovered: the covered object ID (required) :type objectCovered: str :param stix_id: (optional) the STIX ID :type stix_id: str :param description: (optional) description :type description: str :param createdBy: (optional) the author ID :type createdBy: str :param objectMarking: (optional) list of marking definition IDs :type objectMarking: list :param objectLabel: (optional) list of label IDs :type objectLabel: list :param externalReferences: (optional) list of external reference IDs :type externalReferences: list :param external_uri: (optional) external URI :type external_uri: str :param coverage_last_result: (optional) last result date :type coverage_last_result: str :param coverage_valid_from: (optional) valid from date :type coverage_valid_from: str :param coverage_valid_to: (optional) valid to date :type coverage_valid_to: str :param coverage_information: (optional) coverage information :type coverage_information: list :param auto_enrichment_disable: (optional) disable auto enrichment :type auto_enrichment_disable: bool :param periodicity: (optional) periodicity :type periodicity: str :param duration: (optional) duration :type duration: str :param type_affinity: (optional) type affinity :type type_affinity: str :param platforms_affinity: (optional) platforms affinity :type platforms_affinity: list :param files: (optional) list of File objects to attach :type files: list :param filesMarkings: (optional) list of lists of marking definition IDs for each file :type filesMarkings: list :return: Security Coverage object :rtype: dict or None """ stix_id = kwargs.get("stix_id", None) name = kwargs.get("name", None) description = kwargs.get("description", None) created_by = kwargs.get("createdBy", None) object_marking = kwargs.get("objectMarking", None) object_label = kwargs.get("objectLabel", None) object_covered = kwargs.get("objectCovered", None) external_references = kwargs.get("externalReferences", None) external_uri = kwargs.get("external_uri", None) coverage_last_result = kwargs.get("coverage_last_result", None) coverage_valid_from = kwargs.get("coverage_valid_from", None) coverage_valid_to = kwargs.get("coverage_valid_to", None) coverage_information = kwargs.get("coverage_information", None) auto_enrichment_disable = kwargs.get("auto_enrichment_disable", None) periodicity = kwargs.get("periodicity", None) duration = kwargs.get("duration", None) type_affinity = kwargs.get("type_affinity", None) platforms_affinity = kwargs.get("platforms_affinity", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) files = kwargs.get("files", None) files_markings = kwargs.get("filesMarkings", None) no_trigger_import = kwargs.get("noTriggerImport", None) embedded = kwargs.get("embedded", None) if name is not None and object_covered is not None: self.opencti.app_logger.info("Creating Security Coverage", {"name": name}) query = """ mutation SecurityCoverageAdd($input: SecurityCoverageAddInput!) { securityCoverageAdd(input: $input) { id standard_id entity_type parent_types } } """ result = self.opencti.query( query, { "input": { "stix_id": stix_id, "name": name, "description": description, "createdBy": created_by, "objectMarking": object_marking, "objectLabel": object_label, "objectCovered": object_covered, "external_uri": external_uri, "externalReferences": external_references, "coverage_last_result": coverage_last_result, "coverage_valid_from": coverage_valid_from, "coverage_valid_to": coverage_valid_to, "coverage_information": coverage_information, "auto_enrichment_disable": auto_enrichment_disable, "periodicity": periodicity, "duration": duration, "type_affinity": type_affinity, "platforms_affinity": platforms_affinity, "x_opencti_stix_ids": x_opencti_stix_ids, "files": files, "filesMarkings": files_markings, "noTriggerImport": no_trigger_import, "embedded": embedded, } }, ) return self.opencti.process_multiple_fields( result["data"]["securityCoverageAdd"] ) else: self.opencti.app_logger.error( "[opencti_security_coverage] " "Missing parameters: name or object_covered" ) return None
[docs] def import_from_stix2(self, **kwargs): """Import a Security coverage from a STIX2 object. :param stixObject: the STIX2 Security coverage object :type stixObject: dict :param extras: extra parameters including created_by_id, object_marking_ids, etc. :type extras: dict :param update: whether to update if the entity already exists :type update: bool :return: Security coverage object :rtype: dict or None """ stix_object = kwargs.get("stixObject", None) extras = kwargs.get("extras", {}) if stix_object is not None: # Search in extensions if "x_opencti_stix_ids" not in stix_object: stix_object["x_opencti_stix_ids"] = ( self.opencti.get_attribute_in_extension("stix_ids", stix_object) ) if "x_opencti_granted_refs" not in stix_object: stix_object["x_opencti_granted_refs"] = ( self.opencti.get_attribute_in_extension("granted_refs", stix_object) ) raw_coverages = stix_object["coverage"] if "coverage" in stix_object else [] coverage_information = list( map( lambda cov: { "coverage_name": cov["name"], "coverage_score": cov["score"], }, raw_coverages, ) ) return self.create( stix_id=stix_object["id"], name=stix_object["name"], external_uri=( stix_object["external_uri"] if "external_uri" in stix_object else None ), auto_enrichment_disable=( stix_object["auto_enrichment_disable"] if "auto_enrichment_disable" in stix_object else False ), periodicity=( stix_object["periodicity"] if "periodicity" in stix_object else None ), duration=( stix_object["duration"] if "duration" in stix_object else None ), type_affinity=( stix_object["type_affinity"] if "type_affinity" in stix_object else None ), platforms_affinity=( stix_object["platforms_affinity"] if "platforms_affinity" in stix_object else None ), coverage_last_result=( stix_object["last_result"] if "last_result" in stix_object else None ), coverage_valid_from=( stix_object["valid_from"] if "valid_from" in stix_object else None ), coverage_valid_to=( stix_object["valid_to"] if "valid_to" in stix_object else None ), coverage_information=coverage_information, description=( self.opencti.stix2.convert_markdown(stix_object["description"]) if "description" in stix_object else None ), createdBy=( extras["created_by_id"] if "created_by_id" in extras else None ), objectMarking=( extras["object_marking_ids"] if "object_marking_ids" in extras else None ), objectLabel=( extras["object_label_ids"] if "object_label_ids" in extras else None ), objectCovered=( stix_object["covered_ref"] if "covered_ref" in stix_object else None ), externalReferences=( extras["external_references_ids"] if "external_references_ids" in extras else None ), x_opencti_stix_ids=( stix_object["x_opencti_stix_ids"] if "x_opencti_stix_ids" in stix_object else None ), files=extras.get("files"), filesMarkings=extras.get("filesMarkings"), noTriggerImport=extras.get("noTriggerImport", None), embedded=extras.get("embedded", None), ) else: self.opencti.app_logger.error( "[opencti_security_coverage] Missing parameters: stixObject" ) return None