Source code for pycti.entities.opencti_marking_definition

# coding: utf-8

import json
import uuid

from stix2.canonicalization.Canonicalize import canonicalize


[docs] class MarkingDefinition: """Main MarkingDefinition class for OpenCTI Manages marking definitions (TLP, statements) in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the MarkingDefinition instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types definition_type definition x_opencti_order x_opencti_color created modified created_at updated_at """
@staticmethod
[docs] def generate_id(definition_type, definition): """Generate a STIX ID for a Marking Definition. :param definition_type: The type of marking (TLP, statement, etc.) :type definition_type: str :param definition: The definition value :type definition: str :return: STIX ID for the marking definition :rtype: str """ # Handle static IDs from OpenCTI if definition_type == "TLP": if definition == "TLP:CLEAR" or definition == "TLP:WHITE": return "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" if definition == "TLP:GREEN": return "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da" if definition == "TLP:AMBER": return "marking-definition--f88d31f6-486f-44da-b317-01333bde0b82" if definition == "TLP:AMBER+STRICT": return "marking-definition--826578e1-40ad-459f-bc73-ede076f81f37" if definition == "TLP:RED": return "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed" # Generate IDs data = {"definition_type": definition_type, "definition": definition} data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) return "marking-definition--" + id
@staticmethod
[docs] def generate_id_from_data(data): """Generate a STIX ID from marking definition data. :param data: Dictionary containing 'definition_type' and 'definition' keys :type data: dict :return: STIX ID for the marking definition :rtype: str """ return MarkingDefinition.generate_id( data["definition_type"], data["definition"] )
[docs] def list(self, **kwargs): """List Marking-Definition objects. :param filters: the filters to apply :type filters: dict :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :param orderBy: field to order results by :type orderBy: str :param orderMode: ordering mode (asc/desc) :type orderMode: str :param customAttributes: custom attributes to return :type customAttributes: list :param withPagination: whether to include pagination info :type withPagination: bool :return: List of Marking-Definition objects :rtype: list """ filters = kwargs.get("filters", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) with_pagination = kwargs.get("withPagination", False) self.opencti.app_logger.info( "Listing Marking-Definitions with filters", {"filters": json.dumps(filters)} ) query = ( """ query MarkingDefinitions($filters: FilterGroup, $first: Int, $after: ID, $orderBy: MarkingDefinitionsOrdering, $orderMode: OrderingMode) { markingDefinitions(filters: $filters, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + (custom_attributes if custom_attributes is not None else self.properties) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) return self.opencti.process_multiple( result["data"]["markingDefinitions"], with_pagination )
[docs] def read(self, **kwargs): """Read a Marking-Definition object. :param id: the id of the Marking-Definition :type id: str :param filters: the filters to apply if no id provided :type filters: dict :return: Marking-Definition object :rtype: dict or None """ id = kwargs.get("id", None) filters = kwargs.get("filters", None) if id is not None: self.opencti.app_logger.info("Reading Marking-Definition", {"id": id}) query = ( """ query MarkingDefinition($id: String!) { markingDefinition(id: $id) { """ + self.properties + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["markingDefinition"] ) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_marking_definition] Missing parameters: id or filters" ) return None
[docs] def create(self, **kwargs): """Create a Marking-Definition object. :param stix_id: (optional) the STIX ID :type stix_id: str :param created: (optional) creation date :type created: datetime :param modified: (optional) modification date :type modified: datetime :param definition_type: the definition type (required) :type definition_type: str :param definition: the definition value (required) :type definition: str :param x_opencti_order: (optional) order (default: 0) :type x_opencti_order: int :param x_opencti_color: (optional) color :type x_opencti_color: str :param x_opencti_stix_ids: (optional) list of additional STIX IDs :type x_opencti_stix_ids: list :param update: (optional) whether to update if exists (default: False) :type update: bool :return: Marking-Definition object :rtype: dict or None """ stix_id = kwargs.get("stix_id", None) created = kwargs.get("created", None) modified = kwargs.get("modified", None) definition_type = kwargs.get("definition_type", None) definition = kwargs.get("definition", None) x_opencti_order = kwargs.get("x_opencti_order", 0) x_opencti_color = kwargs.get("x_opencti_color", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) update = kwargs.get("update", False) if definition is not None and definition_type is not None: query = ( """ mutation MarkingDefinitionAdd($input: MarkingDefinitionAddInput!) { markingDefinitionAdd(input: $input) { """ + self.properties + """ } } """ ) result = self.opencti.query( query, { "input": { "definition_type": definition_type, "definition": definition, "x_opencti_order": x_opencti_order, "x_opencti_color": x_opencti_color, "stix_id": stix_id, "created": created, "modified": modified, "x_opencti_stix_ids": x_opencti_stix_ids, "update": update, } }, ) return self.opencti.process_multiple_fields( result["data"]["markingDefinitionAdd"] ) else: self.opencti.app_logger.error( "[opencti_marking_definition] Missing parameters: definition and definition_type", ) return None
[docs] def update_field(self, **kwargs): """Update a Marking Definition object field. :param id: the Marking Definition id :type id: str :param input: the input of the field :type input: list :return: The updated Marking Definition object :rtype: dict or None """ id = kwargs.get("id", None) input = kwargs.get("input", None) if id is not None and input is not None: self.opencti.app_logger.info("Updating Marking Definition", {"id": id}) query = """ mutation MarkingDefinitionEdit($id: ID!, $input: [EditInput]!) { markingDefinitionEdit(id: $id) { fieldPatch(input: $input) { id standard_id entity_type } } } """ result = self.opencti.query( query, { "id": id, "input": input, }, ) return self.opencti.process_multiple_fields( result["data"]["markingDefinitionEdit"]["fieldPatch"] ) else: self.opencti.app_logger.error( "[opencti_marking_definition] Missing parameters: id and input" ) return None
[docs] def import_from_stix2(self, **kwargs): """Import a Marking Definition object from a STIX2 object. :param stixObject: the Stix-Object Marking Definition :type stixObject: dict :param update: set the update flag on import :type update: bool :return: Marking Definition object :rtype: dict or None """ stix_object = kwargs.get("stixObject", None) update = kwargs.get("update", False) if stix_object is not None: if ( "x_opencti_definition_type" in stix_object and "x_opencti_definition" in stix_object ): definition_type = stix_object["x_opencti_definition_type"] definition = stix_object["x_opencti_definition"] elif "definition_type" in stix_object: definition_type = stix_object["definition_type"] definition = None if stix_object["definition_type"] == "tlp": definition_type = definition_type.upper() if "definition" in stix_object: definition = ( definition_type + ":" + stix_object["definition"]["tlp"].upper() ) elif "name" in stix_object: definition = stix_object["name"] else: if "definition" in stix_object: if isinstance(stix_object["definition"], str): definition = stix_object["definition"] elif ( isinstance(stix_object["definition"], dict) and stix_object["definition_type"] in stix_object["definition"] ): definition = stix_object["definition"][ stix_object["definition_type"] ] else: definition = stix_object["name"] elif "name" in stix_object: definition = stix_object["name"] elif "name" in stix_object: if ":" in stix_object["name"]: definition_type = stix_object["name"].split(":")[0] else: definition_type = "statement" definition = stix_object["name"] else: return None # Replace TLP:WHITE if definition == "TLP:WHITE": definition = "TLP:CLEAR" # Search in extensions if ( "x_opencti_order" not in stix_object and self.opencti.get_attribute_in_extension("order", stix_object) is not None ): stix_object["x_opencti_order"] = ( self.opencti.get_attribute_in_extension("order", stix_object) ) if "x_opencti_color" not in stix_object: stix_object["x_opencti_color"] = ( self.opencti.get_attribute_in_extension("color", stix_object) ) if "x_opencti_stix_ids" not in stix_object: stix_object["x_opencti_stix_ids"] = ( self.opencti.get_attribute_in_extension("stix_ids", stix_object) ) return self.create( stix_id=stix_object["id"], created=stix_object["created"] if "created" in stix_object else None, modified=stix_object["modified"] if "modified" in stix_object else None, definition_type=definition_type, definition=definition, x_opencti_order=( stix_object["x_opencti_order"] if "x_opencti_order" in stix_object else 0 ), x_opencti_color=( stix_object["x_opencti_color"] if "x_opencti_color" in stix_object else None ), x_opencti_stix_ids=( stix_object["x_opencti_stix_ids"] if "x_opencti_stix_ids" in stix_object else None ), update=update, ) else: self.opencti.app_logger.error( "[opencti_marking_definition] Missing parameters: stixObject" ) return None
[docs] def delete(self, **kwargs): """Delete a Marking-Definition object. :param id: the id of the Marking-Definition to delete :type id: str :return: None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Deleting Marking-Definition", {"id": id}) query = """ mutation MarkingDefinitionEdit($id: ID!) { markingDefinitionEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error( "[opencti_marking_definition] Missing parameters: id" ) return None