import json
import uuid
from typing import Tuple
from typing_extensions import deprecated
from pycti.utils.opencti_stix2_identifier import (
external_reference_generate_id,
kill_chain_phase_generate_id,
)
from pycti.utils.opencti_stix2_utils import (
STIX_CYBER_OBSERVABLE_MAPPING,
SUPPORTED_INTERNAL_OBJECTS,
SUPPORTED_STIX_ENTITY_OBJECTS,
)
[docs]
OPENCTI_EXTENSION = "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba"
[docs]
supported_types = (
SUPPORTED_STIX_ENTITY_OBJECTS # entities
+ SUPPORTED_INTERNAL_OBJECTS # internals
+ list(STIX_CYBER_OBSERVABLE_MAPPING.keys()) # observables
+ ["relationship", "sighting"] # relationships
+ ["pir"]
)
[docs]
def is_id_supported(key):
"""Check if a STIX ID type is supported for processing.
:param key: STIX ID or identifier to check
:type key: str
:return: True if the ID type is supported, False otherwise
:rtype: bool
"""
if "--" in key:
id_type = key.split("--")[0]
return id_type in supported_types
# If not a stix id, don't try to filter
return True
[docs]
class OpenCTIStix2Splitter:
"""STIX2 bundle splitter for OpenCTI.
Splits large STIX2 bundles into smaller chunks for processing,
handling dependencies between objects and deduplicating references.
"""
[docs]
def __init__(self):
"""Initialize the STIX2 bundle splitter.
Sets up internal caches for tracking processed elements,
references, and incompatible items.
"""
[docs]
self.incompatible_items = []
[docs]
def get_internal_ids_in_extension(self, item):
"""Get internal IDs from OpenCTI extensions in a STIX object.
:param item: the STIX object to extract IDs from
:type item: dict
:return: list of internal IDs found in extensions
:rtype: list
"""
ids = []
if item.get("x_opencti_id"):
ids.append(item["x_opencti_id"])
if (
item.get("extensions")
and item["extensions"].get(OPENCTI_EXTENSION)
and item["extensions"].get(OPENCTI_EXTENSION).get("id")
):
ids.append(item["extensions"][OPENCTI_EXTENSION]["id"])
return ids
[docs]
def enlist_element(
self, item_id, raw_data, cleanup_inconsistent_bundle, parent_acc
):
"""Enlist an element and its dependencies for processing.
:param item_id: the ID of the item to enlist
:type item_id: str
:param raw_data: the raw data dictionary of all items
:type raw_data: dict
:param cleanup_inconsistent_bundle: whether to cleanup inconsistent references
:type cleanup_inconsistent_bundle: bool
:param parent_acc: accumulator of parent IDs to prevent circular references
:type parent_acc: list
:return: number of dependencies enlisted
:rtype: int
"""
nb_deps = 1
if item_id not in raw_data:
return 0
existing_item = self.cache_index.get(item_id)
if existing_item is not None:
return existing_item["nb_deps"]
item = raw_data[item_id]
if self.cache_refs.get(item_id) is None:
self.cache_refs[item_id] = []
for key in list(item.keys()):
value = item[key]
# Recursive enlist for every refs
if key.endswith("_refs") and item[key] is not None:
to_keep = []
for element_ref in item[key]:
# We need to check if this ref is not already a reference
is_missing_ref = raw_data.get(element_ref) is None
must_be_cleaned = is_missing_ref and cleanup_inconsistent_bundle
not_dependency_ref = (
self.cache_refs.get(element_ref) is None
or item_id not in self.cache_refs[element_ref]
)
# Prevent any self reference
if (
is_id_supported(element_ref)
and not must_be_cleaned
and element_ref not in parent_acc
and element_ref != item_id
and not_dependency_ref
):
self.cache_refs[item_id].append(element_ref)
nb_deps += self.enlist_element(
element_ref,
raw_data,
cleanup_inconsistent_bundle,
parent_acc + [element_ref],
)
if element_ref not in to_keep:
to_keep.append(element_ref)
item[key] = to_keep
elif key.endswith("_ref"):
is_missing_ref = raw_data.get(value) is None
must_be_cleaned = is_missing_ref and cleanup_inconsistent_bundle
not_dependency_ref = (
self.cache_refs.get(value) is None
or item_id not in self.cache_refs[value]
)
# Prevent any self reference
if (
value is not None
and not must_be_cleaned
and value not in parent_acc
and is_id_supported(value)
and value != item_id
and not_dependency_ref
):
self.cache_refs[item_id].append(value)
nb_deps += self.enlist_element(
value,
raw_data,
cleanup_inconsistent_bundle,
parent_acc + [value],
)
else:
item[key] = None
# Case for embedded elements (deduplicating and cleanup)
elif key == "external_references" and item[key] is not None:
# specific case of splitting external references
# reference_ids = []
deduplicated_references = []
deduplicated_references_cache = {}
references = item[key]
for reference in references:
reference_id = external_reference_generate_id(
url=reference.get("url"),
source_name=reference.get("source_name"),
external_id=reference.get("external_id"),
)
if (
reference_id is not None
and deduplicated_references_cache.get(reference_id) is None
):
deduplicated_references_cache[reference_id] = reference_id
deduplicated_references.append(reference)
# - Needed for a future move of splitting the elements
# reference["id"] = reference_id
# reference["type"] = "External-Reference"
# raw_data[reference_id] = reference
# if reference_id not in reference_ids:
# reference_ids.append(reference_id)
# nb_deps += self.enlist_element(reference_id, raw_data)
item[key] = deduplicated_references
elif key == "kill_chain_phases" and item[key] is not None:
# specific case of splitting kill_chain phases
# kill_chain_ids = []
deduplicated_kill_chain = []
deduplicated_kill_chain_cache = {}
kill_chains = item[key]
for kill_chain in kill_chains:
kill_chain_id = kill_chain_phase_generate_id(
kill_chain_name=kill_chain.get("kill_chain_name"),
phase_name=kill_chain.get("phase_name"),
)
if (
kill_chain_id is not None
and deduplicated_kill_chain_cache.get(kill_chain_id) is None
):
deduplicated_kill_chain_cache[kill_chain_id] = kill_chain_id
deduplicated_kill_chain.append(kill_chain)
# - Needed for a future move of splitting the elements
# kill_chain["id"] = kill_chain_id
# kill_chain["type"] = "Kill-Chain-Phase"
# raw_data[kill_chain_id] = kill_chain
# if kill_chain_id not in kill_chain_ids:
# kill_chain_ids.append(kill_chain_id)
# nb_deps += self.enlist_element(kill_chain_id, raw_data)
item[key] = deduplicated_kill_chain
# Get the final dep counting and add in cache
item["nb_deps"] = nb_deps
# Put in cache
if self.cache_index.get(item_id) is None:
# enlist only if compatible
if item["type"] == "relationship":
is_compatible = (
item["source_ref"] is not None and item["target_ref"] is not None
)
elif item["type"] == "sighting":
is_compatible = (
item.get("sighting_of_ref") is not None
and len(item.get("where_sighted_refs", [])) > 0
)
else:
is_compatible = is_id_supported(item_id)
if is_compatible:
self.elements.append(item)
else:
self.incompatible_items.append(item)
self.cache_index[item_id] = item
for internal_id in self.get_internal_ids_in_extension(item):
self.cache_index[internal_id] = item
return nb_deps
[docs]
def split_bundle_with_expectations(
self,
bundle,
use_json=True,
event_version=None,
cleanup_inconsistent_bundle=False,
) -> Tuple[int, list, list]:
"""Split a valid STIX2 bundle into a list of bundles.
:param bundle: the STIX2 bundle to split
:type bundle: str or dict
:param use_json: whether the bundle is JSON string (True) or dict (False)
:type use_json: bool
:param event_version: (optional) event version to include in bundles
:type event_version: str or None
:param cleanup_inconsistent_bundle: whether to cleanup inconsistent references
:type cleanup_inconsistent_bundle: bool
:return: tuple of (number of expectations, incompatible items, list of bundles)
:rtype: Tuple[int, list, list]
"""
if use_json:
try:
bundle_data = json.loads(bundle)
except json.JSONDecodeError as e:
raise Exception(f"File data is not a valid JSON: {e}")
else:
bundle_data = bundle
if "objects" not in bundle_data:
raise Exception("File data is not a valid bundle")
if "id" not in bundle_data:
bundle_data["id"] = "bundle--" + str(uuid.uuid4())
raw_data = {}
# Build flat list of elements
for item in bundle_data["objects"]:
raw_data[item["id"]] = item
for internal_id in self.get_internal_ids_in_extension(item):
raw_data[internal_id] = item
for item in bundle_data["objects"]:
self.enlist_element(item["id"], raw_data, cleanup_inconsistent_bundle, [])
# Build the bundles
bundles = []
def by_dep_size(elem):
"""Get the dependency count for sorting elements.
:param elem: Element dictionary containing nb_deps
:type elem: dict
:return: Number of dependencies
:rtype: int
"""
return elem["nb_deps"]
self.elements.sort(key=by_dep_size)
elements_with_deps = list(
map(lambda e: {"nb_deps": e["nb_deps"], "elements": [e]}, self.elements)
)
number_expectations = 0
for entity in elements_with_deps:
number_expectations += len(entity["elements"])
bundles.append(
self.stix2_create_bundle(
bundle_data["id"],
entity["nb_deps"],
entity["elements"],
use_json,
event_version,
)
)
return (
number_expectations,
self.incompatible_items,
bundles,
)
@deprecated("Use split_bundle_with_expectations instead")
[docs]
def split_bundle(self, bundle, use_json=True, event_version=None) -> list:
"""Split a valid STIX2 bundle into a list of bundles.
.. deprecated::
Use :meth:`split_bundle_with_expectations` instead.
:param bundle: the STIX2 bundle to split
:type bundle: str or dict
:param use_json: whether the bundle is JSON string (True) or dict (False)
:type use_json: bool
:param event_version: (optional) event version to include in bundles
:type event_version: str or None
:return: list of STIX2 bundles
:rtype: list
"""
_, _, bundles = self.split_bundle_with_expectations(
bundle, use_json, event_version
)
return bundles
@staticmethod
[docs]
def stix2_create_bundle(bundle_id, bundle_seq, items, use_json, event_version=None):
"""Create a STIX2 bundle with items.
:param bundle_id: the bundle ID
:type bundle_id: str
:param bundle_seq: the bundle sequence number
:type bundle_seq: int
:param items: valid STIX2 items
:type items: list
:param use_json: whether to return JSON string (True) or dict (False)
:type use_json: bool
:param event_version: (optional) event version to include
:type event_version: str or None
:return: STIX2 bundle as JSON string or dict
:rtype: str or dict
"""
bundle = {
"type": "bundle",
"id": bundle_id,
"spec_version": "2.1",
"x_opencti_seq": bundle_seq,
"objects": items,
}
if event_version is not None:
bundle["x_opencti_event_version"] = event_version
return json.dumps(bundle) if use_json else bundle