pycti.utils.opencti_stix2 ========================= .. py:module:: pycti.utils.opencti_stix2 Attributes ---------- .. autoapisummary:: pycti.utils.opencti_stix2.utc pycti.utils.opencti_stix2.UTC pycti.utils.opencti_stix2.SPEC_VERSION pycti.utils.opencti_stix2.ERROR_TYPE_LOCK pycti.utils.opencti_stix2.ERROR_TYPE_MISSING_REFERENCE pycti.utils.opencti_stix2.ERROR_TYPE_BAD_GATEWAY pycti.utils.opencti_stix2.ERROR_TYPE_DRAFT_LOCK pycti.utils.opencti_stix2.ERROR_TYPE_WORK_NOT_ALIVE pycti.utils.opencti_stix2.ERROR_TYPE_TIMEOUT pycti.utils.opencti_stix2.STIX_EXT_OCTI pycti.utils.opencti_stix2.STIX_EXT_OCTI_SCO pycti.utils.opencti_stix2.STIX_EXT_MITRE pycti.utils.opencti_stix2.PROCESSING_COUNT pycti.utils.opencti_stix2.MAX_PROCESSING_COUNT pycti.utils.opencti_stix2.MARKDOWN_EXPORT_FIELDS pycti.utils.opencti_stix2.meter pycti.utils.opencti_stix2.bundles_timeout_error_counter pycti.utils.opencti_stix2.bundles_lock_error_counter pycti.utils.opencti_stix2.bundles_missing_reference_error_counter pycti.utils.opencti_stix2.bundles_bad_gateway_error_counter pycti.utils.opencti_stix2.bundles_timed_out_error_counter pycti.utils.opencti_stix2.bundles_technical_error_counter pycti.utils.opencti_stix2.bundles_success_counter Classes ------- .. autoapisummary:: pycti.utils.opencti_stix2.OpenCTIStix2 Module Contents --------------- .. py:data:: utc .. py:data:: UTC .. py:data:: SPEC_VERSION :value: '2.1' .. py:data:: ERROR_TYPE_LOCK :value: 'LOCK_ERROR' .. py:data:: ERROR_TYPE_MISSING_REFERENCE :value: 'MISSING_REFERENCE_ERROR' .. py:data:: ERROR_TYPE_BAD_GATEWAY :value: 'Bad Gateway' .. py:data:: ERROR_TYPE_DRAFT_LOCK :value: 'DRAFT_LOCKED' .. py:data:: ERROR_TYPE_WORK_NOT_ALIVE :value: 'WORK_NOT_ALIVE' .. py:data:: ERROR_TYPE_TIMEOUT :value: 'Request timed out' .. py:data:: STIX_EXT_OCTI :type: str :value: 'extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba' .. py:data:: STIX_EXT_OCTI_SCO :type: str :value: 'extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82' .. py:data:: STIX_EXT_MITRE :type: str :value: 'extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b' .. py:data:: PROCESSING_COUNT :type: int :value: 4 .. py:data:: MAX_PROCESSING_COUNT :type: int :value: 100 .. py:data:: MARKDOWN_EXPORT_FIELDS :type: Tuple[str, Ellipsis] :value: ('description', 'x_opencti_description', 'content') .. py:data:: meter .. py:data:: bundles_timeout_error_counter .. py:data:: bundles_lock_error_counter .. py:data:: bundles_missing_reference_error_counter .. py:data:: bundles_bad_gateway_error_counter .. py:data:: bundles_timed_out_error_counter .. py:data:: bundles_technical_error_counter .. py:data:: bundles_success_counter .. py:class:: OpenCTIStix2(opencti) Python API for Stix2 in OpenCTI. Handles conversion between STIX2 format and OpenCTI internal format, including import/export operations and bundle processing. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient Initialize the OpenCTIStix2 helper. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient .. py:attribute:: opencti .. py:attribute:: stix2_update .. py:attribute:: mapping_cache .. py:attribute:: mapping_cache_permanent .. py:method:: get_in_cache(data_id) Get an item from the cache. :param data_id: ID of the data to retrieve :type data_id: str :return: Cached data or None if not found :rtype: dict or None .. py:method:: set_in_cache(data_id, data) Store an item in the cache. :param data_id: ID of the data to store :type data_id: str :param data: Data to cache :type data: dict .. py:method:: unknown_type(stix_object: Dict) -> None Log an error for unknown STIX object types. :param stix_object: STIX object with unknown type :type stix_object: Dict .. py:method:: convert_markdown(text: str) -> str Convert input text to markdown style code annotation. :param text: Input text to convert :type text: str :return: Sanitized text with markdown style code annotation :rtype: str .. py:method:: format_date(date: Any = None) -> str Convert multiple input date formats to OpenCTI style dates. :param date: Input date (datetime, date, str or None) :type date: Any :return: ISO 8601 formatted date string :rtype: str .. py:method:: filter_objects(uuids: List, objects: List) -> List Filter objects based on UUIDs. :param uuids: List of UUIDs to filter by :type uuids: list :param objects: List of objects to filter :type objects: list :return: List of filtered objects not in the uuids list :rtype: list .. py:method:: pick_aliases(stix_object: Dict) -> Optional[List] Check STIX2 object for multiple aliases and return a list. :param stix_object: Valid STIX2 object :type stix_object: Dict :return: List of aliases or None if no aliases found :rtype: list or None .. py:method:: import_bundle_from_file(file_path: str, update: bool = False, types: List = None) -> Optional[Tuple[list, list]] Import a STIX2 bundle from a file. :param file_path: Valid path to the file :type file_path: str :param update: Whether to update data in the database, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: list, optional :return: Tuple of (imported objects, failed objects) or None if file not found :rtype: Tuple[list, list] or None .. py:method:: import_bundle_from_json(json_data: Union[str, bytes], update: bool = False, types: List = None, work_id: str = None, objects_max_refs: int = 0) -> Tuple[list, list] Import a STIX2 bundle from JSON data. :param json_data: JSON data as string or bytes :type json_data: str or bytes :param update: Whether to update data in the database, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: list, optional :param work_id: Work ID for tracking import progress :type work_id: str, optional :param objects_max_refs: Maximum object references; rejects import if exceeded :type objects_max_refs: int, optional :return: Tuple of (imported objects, objects with too many dependencies) :rtype: Tuple[list, list] .. py:method:: resolve_author(title: str) -> Optional[pycti.entities.opencti_identity.Identity] Resolve an author identity from a title string. :param title: Title to search for known author names :type title: str :return: Identity object if author found, None otherwise :rtype: Identity or None .. py:method:: get_author(name: str) -> pycti.entities.opencti_identity.Identity Get or create an author identity by name. :param name: Name of the author organization :type name: str :return: Identity object for the author :rtype: Identity .. py:method:: extract_embedded_relationships(stix_object: Dict, types: List = None) -> Dict Extract embedded relationship objects from a STIX2 entity. :param stix_object: Valid STIX2 object :type stix_object: Dict :param types: List of STIX2 types to filter, defaults to None :type types: list, optional :return: Dictionary containing embedded relationships and references :rtype: dict .. py:method:: get_readers() Get a dictionary mapping entity types to their read methods. :return: Dictionary mapping entity types to read functions :rtype: dict .. py:method:: get_reader(entity_type: str) Get the appropriate reader function for a given entity type. :param entity_type: Type of the entity :type entity_type: str :return: Reader function for the entity type :rtype: callable or None .. py:method:: get_stix_helper() Get a dictionary mapping STIX types to their helper functions. :return: Dictionary mapping STIX types to generate_id functions :rtype: dict .. py:method:: get_internal_helper() Get a dictionary mapping internal types to their helper functions. :return: Dictionary mapping internal types to generate_id functions :rtype: dict .. py:method:: generate_standard_id_from_stix(data) Generate a standard ID from STIX data. :param data: STIX data dictionary :type data: dict :return: Generated standard ID or None :rtype: str or None .. py:method:: import_object(stix_object: Dict, update: bool = False, types: List = None) -> Optional[List] Import a STIX2 object into OpenCTI. :param stix_object: Valid STIX2 object to import :type stix_object: Dict :param update: Whether to update data in the database, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: list, optional :return: List of imported STIX2 objects or None on failure :rtype: list or None .. py:method:: import_observable(stix_object: Dict, update: bool = False, types: List = None) -> None Import a STIX cyber observable into OpenCTI. :param stix_object: Valid STIX2 cyber observable object :type stix_object: Dict :param update: Whether to update existing data in the database, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: list, optional .. py:method:: import_relationship(stix_relation: Dict, update: bool = False, types: List = None) -> None Import a STIX core relationship into OpenCTI. :param stix_relation: Valid STIX2 relationship object :type stix_relation: Dict :param update: Whether to update existing data in the database, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: list, optional .. py:method:: import_sighting(stix_sighting: Dict, from_id: str, to_id: str, update: bool = False, types: List = None) -> None Import a STIX sighting relationship into OpenCTI. :param stix_sighting: Valid STIX2 sighting object :type stix_sighting: Dict :param from_id: ID of the source entity (sighting_of_ref) :type from_id: str :param to_id: ID of the target entity (where_sighted_ref) :type to_id: str :param update: Whether to update existing data in the database, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: list, optional .. py:method:: generate_export(entity: Dict, no_custom_attributes: bool = False) -> Dict Generate a STIX2 export from an OpenCTI entity. :param entity: OpenCTI entity dictionary to export :type entity: Dict :param no_custom_attributes: Whether to exclude custom x_opencti attributes, defaults to False :type no_custom_attributes: bool, optional :return: STIX2 formatted entity dictionary :rtype: Dict .. py:method:: prepare_id_filters_export(entity_id: Union[str, List[str]], access_filter: Dict = None) -> Dict :staticmethod: Prepare filter configuration for entity ID-based export queries. :param entity_id: Single entity ID or list of entity IDs to filter :type entity_id: Union[str, List[str]] :param access_filter: Additional access filter to combine, defaults to None :type access_filter: Dict, optional :return: Filter configuration dictionary for API queries :rtype: Dict .. py:method:: prepare_export(entity: Dict, mode: str = 'simple', access_filter: Dict = None, no_custom_attributes: bool = False) -> List Prepare an entity for STIX2 export with related objects. :param entity: Entity dictionary to prepare for export :type entity: Dict :param mode: Export mode - 'simple' for entity only, 'full' for entity with relations :type mode: str :param access_filter: Access filter for the export, defaults to None :type access_filter: Dict, optional :param no_custom_attributes: Whether to exclude custom attributes, defaults to False :type no_custom_attributes: bool, optional :return: List of STIX2 objects ready for export :rtype: List .. py:method:: get_stix_bundle_or_object_from_entity_id(entity_type: str, entity_id: str, mode: str = 'simple', access_filter: Dict = None, no_custom_attributes: bool = False, only_entity: bool = False) -> Dict Get a STIX2 bundle or single object from an entity ID. :param entity_type: Type of the entity to export :type entity_type: str :param entity_id: ID of the entity to export :type entity_id: str :param mode: Export mode - 'simple' or 'full', defaults to 'simple' :type mode: str :param access_filter: Access filter for the export, defaults to None :type access_filter: Dict, optional :param no_custom_attributes: Whether to exclude custom attributes, defaults to False :type no_custom_attributes: bool, optional :param only_entity: If True, return only the entity object instead of a bundle :type only_entity: bool, optional :return: STIX2 bundle dictionary or single STIX2 object if only_entity is True :rtype: Dict .. py:method:: export_entity(entity_type: str, entity_id: str, mode: str = 'simple', access_filter: Dict = None, no_custom_attributes: bool = False, only_entity: bool = False) -> Dict Export an entity as a STIX2 bundle. .. deprecated:: Use :meth:`get_stix_bundle_or_object_from_entity_id` instead. :param entity_type: Type of the entity to export :type entity_type: str :param entity_id: ID of the entity to export :type entity_id: str :param mode: Export mode - 'simple' or 'full', defaults to 'simple' :type mode: str :param access_filter: Access filter for the export, defaults to None :type access_filter: Dict, optional :param no_custom_attributes: Whether to exclude custom attributes, defaults to False :type no_custom_attributes: bool, optional :param only_entity: If True, return only the entity object instead of a bundle :type only_entity: bool, optional :return: STIX2 bundle dictionary or single STIX2 object :rtype: Dict .. py:method:: export_entities_list(entity_type: str, search: Dict = None, filters: Dict = None, orderBy: str = None, orderMode: str = None, getAll: bool = True, withFiles: bool = False) -> List[Dict] List entities for export based on type and filters. :param entity_type: Type of entities to list :type entity_type: str :param search: Search parameters, defaults to None :type search: Dict, optional :param filters: Filter parameters, defaults to None :type filters: Dict, optional :param orderBy: Field to order results by, defaults to None :type orderBy: str, optional :param orderMode: Order direction ('asc' or 'desc'), defaults to None :type orderMode: str, optional :param getAll: Whether to get all results, defaults to True :type getAll: bool, optional :param withFiles: Whether to include files in the export, defaults to False :type withFiles: bool, optional :return: List of entity dictionaries :rtype: List[Dict] .. py:method:: export_list(entity_type: str, search: Dict = None, filters: Dict = None, order_by: str = None, order_mode: str = None, mode: str = 'simple', access_filter: Dict = None) -> Dict Export a list of entities as a STIX2 bundle. :param entity_type: Type of entities to export :type entity_type: str :param search: Search parameters, defaults to None :type search: Dict, optional :param filters: Filter parameters, defaults to None :type filters: Dict, optional :param order_by: Field to order results by, defaults to None :type order_by: str, optional :param order_mode: Order direction ('asc' or 'desc'), defaults to None :type order_mode: str, optional :param mode: Export mode - 'simple' or 'full', defaults to 'simple' :type mode: str :param access_filter: Access filter for the export, defaults to None :type access_filter: Dict, optional :return: STIX2 bundle containing all exported entities :rtype: Dict .. py:method:: export_selected(entities_list: List[dict], mode: str = 'simple', access_filter: Dict = None) -> Dict Export selected entities as a STIX2 bundle. :param entities_list: List of entities to export :type entities_list: List[dict] :param mode: Export mode ('simple' or 'full'), defaults to 'simple' :type mode: str :param access_filter: Access filter for the export :type access_filter: Dict :return: STIX2 bundle containing exported entities :rtype: Dict .. py:method:: apply_patch_files(item) Apply file patches to an item. :param item: Item containing file patch operations :type item: dict .. py:method:: apply_patch(item) Apply field patches to an item. :param item: Item containing field patch operations :type item: dict .. py:method:: rule_apply(item, bundle_id) Apply a rule to an item. :param item: Item to apply the rule to :type item: dict .. py:method:: rule_clear(item) Clear a rule from an item. :param item: Item to clear the rule from :type item: dict .. py:method:: rules_rescan(item, bundle_id) Rescan rules for an item. :param item: Item to rescan rules for :type item: dict .. py:method:: organization_share(item) Share an item with organizations. :param item: Item to share :type item: dict .. py:method:: organization_unshare(item) Unshare an item from organizations. :param item: Item to unshare :type item: dict .. py:method:: element_add_organizations(item) Add organizations to an element. :param item: Item to add organizations to :type item: dict :raises ValueError: If the operation is not compatible with the item type .. py:method:: element_remove_organizations(item) Remove organizations from an element. :param item: Item to remove organizations from :type item: dict :raises ValueError: If the operation is not compatible with the item type .. py:method:: element_add_groups(item) Add groups to an element. :param item: Item to add groups to :type item: dict :raises ValueError: If the operation is not compatible with the item type .. py:method:: element_remove_groups(item) Remove groups from an element. :param item: Item to remove groups from :type item: dict :raises ValueError: If the operation is not compatible with the item type .. py:method:: send_email(item) Send an email for an item. :param item: Item to send email for :type item: dict :raises ValueError: If the operation is not supported for the item type .. py:method:: enroll_playbook(item) .. py:method:: element_operation_delete(item, operation) Delete an element. :param item: Item to delete :type item: dict :param operation: Delete operation type ('delete' or 'delete_force') :type operation: str :raises ValueError: If the delete operation fails or helper not found .. py:method:: element_remove_from_draft(item) Remove an element from draft. :param item: Item to remove from draft :type item: dict .. py:method:: apply_opencti_operation(item, operation, bundle_id) Apply an OpenCTI operation to an item. :param item: Item to apply the operation to :type item: dict :param operation: Operation to apply (delete, restore, merge, patch, etc.) :type operation: str :raises ValueError: If the operation is not supported .. py:method:: import_item(item, update: bool = False, types: List = None, work_id: str = None, bundle_id: str = None) Import a single STIX2 item into OpenCTI. :param item: STIX2 item to import :type item: dict :param update: Whether to update existing data, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: List, optional :param work_id: Work ID for tracking import progress, defaults to None :type work_id: str, optional :return: True on success :rtype: bool .. py:method:: import_item_with_retries(item, update: bool = False, types: List = None, work_id: str = None, bundle_id: str = None) Import a single STIX2 item with automatic retry on failures. Handles various error types including timeouts, lock errors, missing references, and bad gateway errors with appropriate retry strategies. :param item: STIX2 item to import :type item: dict :param update: Whether to update existing data, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: List, optional :param work_id: Work ID for tracking import progress, defaults to None :type work_id: str, optional :return: None on success, the failed item on permanent failure :rtype: dict or None .. py:method:: import_bundle(stix_bundle: Dict, update: bool = False, types: List = None, work_id: str = None, objects_max_refs: int = 0) -> Tuple[list, list] Import a complete STIX2 bundle into OpenCTI. :param stix_bundle: STIX2 bundle dictionary to import :type stix_bundle: Dict :param update: Whether to update existing data, defaults to False :type update: bool, optional :param types: List of STIX2 types to filter, defaults to None :type types: List, optional :param work_id: Work ID for tracking import progress, defaults to None :type work_id: str, optional :param objects_max_refs: Maximum number of object references allowed; objects exceeding this limit will be rejected. Set to 0 to disable the limit. :type objects_max_refs: int, optional :return: Tuple of (list of successfully imported elements, list of failed/too-large elements) :rtype: Tuple[list, list] :raises ValueError: If the bundle is not properly formatted or empty .. py:method:: put_attribute_in_extension(stix_object, extension_id, key, value, multiple=False) -> any :staticmethod: Add or update an attribute in a STIX object's extension. :param stix_object: STIX object to modify :type stix_object: dict :param extension_id: ID of the extension to add the attribute to :type extension_id: str :param key: Attribute key name :type key: str :param value: Attribute value to set :type value: any :param multiple: If True, append value to a list; if False, replace the value :type multiple: bool :return: Modified STIX object :rtype: dict