Source code for pycti.entities.opencti_kill_chain_phase

# coding: utf-8

import json

from pycti.utils.opencti_stix2_identifier import kill_chain_phase_generate_id


[docs] class KillChainPhase: """Main KillChainPhase class for OpenCTI Manages kill chain phases (ATT&CK tactics) in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the KillChainPhase instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types kill_chain_name phase_name x_opencti_order created modified created_at updated_at """
@staticmethod
[docs] def generate_id(phase_name, kill_chain_name): """Generate a STIX ID for a Kill Chain Phase. :param phase_name: The phase name :type phase_name: str :param kill_chain_name: The kill chain name :type kill_chain_name: str :return: STIX ID for the kill chain phase :rtype: str """ return kill_chain_phase_generate_id( phase_name=phase_name, kill_chain_name=kill_chain_name )
@staticmethod
[docs] def generate_id_from_data(data): """Generate a STIX ID from kill chain phase data. :param data: Dictionary containing 'phase_name' and 'kill_chain_name' keys :type data: dict :return: STIX ID for the kill chain phase :rtype: str """ return KillChainPhase.generate_id(data["phase_name"], data["kill_chain_name"])
[docs] def list(self, **kwargs): """List Kill-Chain-Phase objects. :param filters: the filters to apply :type filters: dict :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :param orderBy: field to order results by :type orderBy: str :param orderMode: ordering mode (asc/desc) :type orderMode: str :param customAttributes: custom attributes to return :type customAttributes: list :param withPagination: whether to include pagination info :type withPagination: bool :return: List of Kill-Chain-Phase objects :rtype: list """ filters = kwargs.get("filters", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) with_pagination = kwargs.get("withPagination", False) self.opencti.app_logger.info( "Listing Kill-Chain-Phase with filters", {"filters": json.dumps(filters)} ) query = ( """ query KillChainPhases($filters: FilterGroup, $first: Int, $after: ID, $orderBy: KillChainPhasesOrdering, $orderMode: OrderingMode) { killChainPhases(filters: $filters, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + (custom_attributes if custom_attributes is not None else self.properties) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) return self.opencti.process_multiple( result["data"]["killChainPhases"], with_pagination )
[docs] def read(self, **kwargs): """Read a Kill-Chain-Phase object. :param id: the id of the Kill-Chain-Phase :type id: str :param filters: the filters to apply if no id provided :type filters: dict :return: Kill-Chain-Phase object :rtype: dict or None """ id = kwargs.get("id", None) filters = kwargs.get("filters", None) if id is not None: self.opencti.app_logger.info("Reading Kill-Chain-Phase", {"id": id}) query = ( """ query KillChainPhase($id: String!) { killChainPhase(id: $id) { """ + self.properties + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields( result["data"]["killChainPhase"] ) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_kill_chain_phase] Missing parameters: id or filters" ) return None
[docs] def create(self, **kwargs): """Create a Kill-Chain-Phase object. :param stix_id: (optional) the STIX ID :type stix_id: str :param created: (optional) creation date :type created: datetime :param modified: (optional) modification date :type modified: datetime :param kill_chain_name: the kill chain name (required) :type kill_chain_name: str :param phase_name: the phase name (required) :type phase_name: str :param x_opencti_order: (optional) order (default: 0) :type x_opencti_order: int :param update: (optional) whether to update if exists (default: False) :type update: bool :return: Kill-Chain-Phase object :rtype: dict or None """ stix_id = kwargs.get("stix_id", None) created = kwargs.get("created", None) modified = kwargs.get("modified", None) kill_chain_name = kwargs.get("kill_chain_name", None) phase_name = kwargs.get("phase_name", None) x_opencti_order = kwargs.get("x_opencti_order", 0) update = kwargs.get("update", False) if kill_chain_name is not None and phase_name is not None: self.opencti.app_logger.info( "Creating Kill-Chain-Phase", {"name": phase_name} ) query = ( """ mutation KillChainPhaseAdd($input: KillChainPhaseAddInput!) { killChainPhaseAdd(input: $input) { """ + self.properties + """ } } """ ) result = self.opencti.query( query, { "input": { "stix_id": stix_id, "created": created, "modified": modified, "kill_chain_name": kill_chain_name, "phase_name": phase_name, "x_opencti_order": x_opencti_order, "update": update, } }, ) return self.opencti.process_multiple_fields( result["data"]["killChainPhaseAdd"] ) else: self.opencti.app_logger.error( "[opencti_kill_chain_phase] Missing parameters: kill_chain_name and phase_name", ) return None
[docs] def update_field(self, **kwargs): """Update a Kill Chain Phase object field. :param id: the Kill Chain Phase id :type id: str :param input: the input of the field :type input: list :return: The updated Kill Chain Phase object :rtype: dict or None """ id = kwargs.get("id", None) input = kwargs.get("input", None) if id is not None and input is not None: self.opencti.app_logger.info("Updating Kill chain", {"id": id}) query = """ mutation KillChainPhaseEdit($id: ID!, $input: [EditInput]!) { killChainPhaseEdit(id: $id) { fieldPatch(input: $input) { id standard_id entity_type } } } """ result = self.opencti.query( query, { "id": id, "input": input, }, ) return self.opencti.process_multiple_fields( result["data"]["killChainPhaseEdit"]["fieldPatch"] ) else: self.opencti.app_logger.error( "[opencti_kill_chain_phase] Missing parameters: id and input" ) return None
[docs] def delete(self, **kwargs): """Delete a Kill-Chain-Phase object. :param id: the id of the Kill-Chain-Phase to delete :type id: str :return: None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Deleting Kill-Chain-Phase", {"id": id}) query = """ mutation KillChainPhaseEdit($id: ID!) { killChainPhaseEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error( "[opencti_kill_chain_phase] Missing parameters: id" ) return None