import base64
import datetime
import json
import mimetypes
import os
import random
import re
import time
import traceback
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import quote, unquote, urljoin, urlparse
import datefinder
import dateutil.parser
import pytz
from cachetools import LRUCache
from opentelemetry import metrics
from requests import RequestException, Timeout
from typing_extensions import deprecated
from pycti.entities.opencti_identity import Identity
from pycti.utils.constants import (
IdentityTypes,
LocationTypes,
MultipleRefRelationship,
StixCyberObservableTypes,
ThreatActorTypes,
)
from pycti.utils.opencti_stix2_markdown_embedded_file_utils import (
rewrite_markdown_images,
)
from pycti.utils.opencti_stix2_splitter import OpenCTIStix2Splitter
from pycti.utils.opencti_stix2_update import OpenCTIStix2Update
from pycti.utils.opencti_stix2_utils import (
OBSERVABLES_VALUE_INT,
STIX_CORE_OBJECTS,
STIX_CYBER_OBSERVABLE_MAPPING,
STIX_META_OBJECTS,
OpenCTIStix2Utils,
)
datefinder.ValueError = ValueError, OverflowError
# For Python 3.11+, datetime.UTC is preferred over datetime.timezone.utc
# Fallback to datetime.timezone.utc for older Python versions
[docs]
UTC = getattr(datetime, "UTC", datetime.timezone.utc)
# Spec version
[docs]
ERROR_TYPE_LOCK = "LOCK_ERROR"
[docs]
ERROR_TYPE_MISSING_REFERENCE = "MISSING_REFERENCE_ERROR"
[docs]
ERROR_TYPE_BAD_GATEWAY = "Bad Gateway"
[docs]
ERROR_TYPE_DRAFT_LOCK = "DRAFT_LOCKED"
[docs]
ERROR_TYPE_WORK_NOT_ALIVE = "WORK_NOT_ALIVE"
[docs]
ERROR_TYPE_TIMEOUT = "Request timed out"
#: STIX Extension ID for OpenCTI custom objects and properties
[docs]
STIX_EXT_OCTI: str = "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba"
#: STIX Extension ID for OpenCTI custom Cyber Observables (SCO)
[docs]
STIX_EXT_OCTI_SCO: str = "extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82"
#: STIX Extension ID for MITRE ATT&CK framework objects
[docs]
STIX_EXT_MITRE: str = "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b"
[docs]
PROCESSING_COUNT: int = 4
[docs]
MAX_PROCESSING_COUNT: int = 100
[docs]
MARKDOWN_EXPORT_FIELDS: Tuple[str, ...] = (
"description",
"x_opencti_description",
"content",
)
[docs]
meter = metrics.get_meter(__name__)
[docs]
bundles_timeout_error_counter = meter.create_counter(
name="opencti_bundles_timeout_error_counter",
description="number of bundles in timeout error",
)
[docs]
bundles_lock_error_counter = meter.create_counter(
name="opencti_bundles_lock_error_counter",
description="number of bundles in lock error",
)
[docs]
bundles_missing_reference_error_counter = meter.create_counter(
name="opencti_bundles_missing_reference_error_counter",
description="number of bundles in missing reference error",
)
[docs]
bundles_bad_gateway_error_counter = meter.create_counter(
name="opencti_bundles_bad_gateway_error_counter",
description="number of bundles in bad gateway error",
)
[docs]
bundles_timed_out_error_counter = meter.create_counter(
name="opencti_bundles_timed_out_error_counter",
description="number of bundles in timed out error",
)
[docs]
bundles_technical_error_counter = meter.create_counter(
name="opencti_bundles_technical_error_counter",
description="number of bundles in technical error",
)
[docs]
bundles_success_counter = meter.create_counter(
name="opencti_bundles_success_counter",
description="number of bundles successfully processed",
)
[docs]
class OpenCTIStix2:
"""Python API for Stix2 in OpenCTI.
Handles conversion between STIX2 format and OpenCTI internal format,
including import/export operations and bundle processing.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the OpenCTIStix2 helper.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.stix2_update = OpenCTIStix2Update(opencti)
[docs]
self.mapping_cache = LRUCache(maxsize=50000)
[docs]
self.mapping_cache_permanent = {}
[docs]
def get_in_cache(self, data_id):
"""Get an item from the cache.
:param data_id: ID of the data to retrieve
:type data_id: str
:return: Cached data or None if not found
:rtype: dict or None
"""
api_draft_id = self.opencti.get_draft_id()
if data_id + api_draft_id in self.mapping_cache:
return self.mapping_cache[data_id + api_draft_id]
return None
[docs]
def set_in_cache(self, data_id, data):
"""Store an item in the cache.
:param data_id: ID of the data to store
:type data_id: str
:param data: Data to cache
:type data: dict
"""
api_draft_id = self.opencti.get_draft_id()
self.mapping_cache[data_id + api_draft_id] = data
######### UTILS
# region utils
[docs]
def unknown_type(self, stix_object: Dict) -> None:
"""Log an error for unknown STIX object types.
:param stix_object: STIX object with unknown type
:type stix_object: Dict
"""
self.opencti.app_logger.error(
"Unknown object type, doing nothing...", {"type": stix_object["type"]}
)
[docs]
def convert_markdown(self, text: str) -> str:
"""Convert input text to markdown style code annotation.
:param text: Input text to convert
:type text: str
:return: Sanitized text with markdown style code annotation
:rtype: str
"""
if text is not None:
# (.*?) → lazy match captures the content between the tags (shortest possible, so nested/adjacent pairs don't bleed into each other)
# re.DOTALL → allows . to match newlines, so multi-line code content inside the tags still works
# \1 in the replacement → puts the captured content back between the backticks
return re.sub(r"<code>(.*?)</code>", r"`\1`", text, flags=re.DOTALL)
else:
return None
@staticmethod
def _extract_embedded_storage_path(
uri: str,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
) -> Optional[str]:
if not isinstance(uri, str):
return None
def _normalize_embedded_path(path: str) -> Optional[str]:
normalized_path = unquote(path).split("?", 1)[0].split("#", 1)[0]
normalized_path = normalized_path.lstrip("/")
if not normalized_path.startswith("embedded/"):
return None
# Relative markdown links such as embedded/file.png need entity context.
if entity_type and entity_id:
relative_tail = normalized_path[len("embedded/") :]
expected_prefix = f"{entity_type}/{entity_id}/"
# Only prefix single-segment paths (embedded/file.png).
# If path already includes folders, keep it as-is.
if "/" not in relative_tail and not relative_tail.startswith(
expected_prefix
):
normalized_path = f"embedded/{expected_prefix}{relative_tail}"
return normalized_path
normalized_uri = unquote(uri)
parsed_uri = urlparse(normalized_uri)
normalized_path = parsed_uri.path or normalized_uri
direct_embedded_path = _normalize_embedded_path(normalized_path)
if direct_embedded_path is not None:
return direct_embedded_path
return None
def _rewrite_markdown_embedded_storage_images(
self,
markdown: str,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
) -> str:
if not isinstance(markdown, str):
return markdown
normalized_markdown = unquote(markdown)
if "embedded/" not in normalized_markdown:
return markdown
embedded_data_uri_by_path: Dict[str, str] = {}
def _replace_image(alt_text: str, url: str, full_match: str) -> str:
embedded_storage_path = self._extract_embedded_storage_path(
url,
entity_type=entity_type,
entity_id=entity_id,
)
if embedded_storage_path is None:
return full_match
guessed_mime_type = mimetypes.guess_type(embedded_storage_path)[0]
if embedded_storage_path not in embedded_data_uri_by_path:
storage_url = urljoin(
self.opencti.api_url.replace("graphql", ""),
f"storage/get/{embedded_storage_path}",
)
encoded_data = self.opencti.fetch_opencti_file(
storage_url,
binary=True,
serialize=True,
)
if not encoded_data:
# Some HTTP stacks reject raw spaces/special chars in path; retry with encoded path.
encoded_storage_path = quote(embedded_storage_path, safe="/")
encoded_storage_url = urljoin(
self.opencti.api_url.replace("graphql", ""),
f"storage/get/{encoded_storage_path}",
)
if encoded_storage_url != storage_url:
encoded_data = self.opencti.fetch_opencti_file(
encoded_storage_url,
binary=True,
serialize=True,
)
if not encoded_data:
return full_match
mime_type = (
guessed_mime_type
or mimetypes.guess_type(embedded_storage_path)[0]
or "application/octet-stream"
)
if not mime_type.startswith("image/"):
return full_match
embedded_data_uri_by_path[embedded_storage_path] = (
f"data:{mime_type};base64,{encoded_data}"
)
return f""
return rewrite_markdown_images(markdown, _replace_image)
def _rewrite_embedded_image_uris_for_export(self, entity: Dict) -> None:
if not isinstance(entity, dict):
return
entity_type = entity.get("x_opencti_type") or entity.get("entity_type")
entity_id = entity.get("x_opencti_id") or entity.get("id")
for key in MARKDOWN_EXPORT_FIELDS:
value = entity.get(key)
if isinstance(value, str):
entity[key] = self._rewrite_markdown_embedded_storage_images(
value,
entity_type=entity_type,
entity_id=entity_id,
)
def _rewrite_embedded_image_uris_in_bundle_for_export(self, bundle: Dict) -> None:
if not isinstance(bundle, dict):
return
objects = bundle.get("objects")
if not isinstance(objects, list):
return
for item in objects:
self._rewrite_embedded_image_uris_for_export(item)
[docs]
def filter_objects(self, uuids: List, objects: List) -> List:
"""Filter objects based on UUIDs.
:param uuids: List of UUIDs to filter by
:type uuids: list
:param objects: List of objects to filter
:type objects: list
:return: List of filtered objects not in the uuids list
:rtype: list
"""
result = []
if objects is not None:
for item in objects:
if "id" in item and item["id"] not in uuids:
result.append(item)
return result
[docs]
def pick_aliases(self, stix_object: Dict) -> Optional[List]:
"""Check STIX2 object for multiple aliases and return a list.
:param stix_object: Valid STIX2 object
:type stix_object: Dict
:return: List of aliases or None if no aliases found
:rtype: list or None
"""
# Add aliases
if "x_opencti_aliases" in stix_object:
return stix_object["x_opencti_aliases"]
elif "x_mitre_aliases" in stix_object:
return stix_object["x_mitre_aliases"]
elif "x_amitt_aliases" in stix_object:
return stix_object["x_amitt_aliases"]
elif "aliases" in stix_object:
return stix_object["aliases"]
return None
[docs]
def import_bundle_from_file(
self,
file_path: str,
update: bool = False,
types: List = None,
) -> Optional[Tuple[list, list]]:
"""Import a STIX2 bundle from a file.
:param file_path: Valid path to the file
:type file_path: str
:param update: Whether to update data in the database, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: list, optional
:return: Tuple of (imported objects, failed objects) or None if file not found
:rtype: Tuple[list, list] or None
"""
if not os.path.isfile(file_path):
self.opencti.app_logger.error("The bundle file does not exist")
return None
with open(os.path.join(file_path), encoding="utf-8") as file:
data = json.load(file)
return self.import_bundle(data, update, types, None)
[docs]
def import_bundle_from_json(
self,
json_data: Union[str, bytes],
update: bool = False,
types: List = None,
work_id: str = None,
objects_max_refs: int = 0,
) -> Tuple[list, list]:
"""Import a STIX2 bundle from JSON data.
:param json_data: JSON data as string or bytes
:type json_data: str or bytes
:param update: Whether to update data in the database, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: list, optional
:param work_id: Work ID for tracking import progress
:type work_id: str, optional
:param objects_max_refs: Maximum object references; rejects import if exceeded
:type objects_max_refs: int, optional
:return: Tuple of (imported objects, objects with too many dependencies)
:rtype: Tuple[list, list]
"""
data = json.loads(json_data)
return self.import_bundle(data, update, types, work_id, objects_max_refs)
[docs]
def resolve_author(self, title: str) -> Optional[Identity]:
"""Resolve an author identity from a title string.
:param title: Title to search for known author names
:type title: str
:return: Identity object if author found, None otherwise
:rtype: Identity or None
"""
if "fireeye" in title.lower() or "mandiant" in title.lower():
return self.get_author("FireEye")
if "eset" in title.lower():
return self.get_author("ESET")
if "dragos" in title.lower():
return self.get_author("Dragos")
if "us-cert" in title.lower():
return self.get_author("US-CERT")
if (
"unit 42" in title.lower()
or "unit42" in title.lower()
or "palo alto" in title.lower()
):
return self.get_author("Palo Alto Networks")
if "accenture" in title.lower():
return self.get_author("Accenture")
if "symantec" in title.lower():
return self.get_author("Symantec")
if "trendmicro" in title.lower() or "trend micro" in title.lower():
return self.get_author("Trend Micro")
if "mcafee" in title.lower():
return self.get_author("McAfee")
if "crowdstrike" in title.lower():
return self.get_author("CrowdStrike")
if "securelist" in title.lower() or "kaspersky" in title.lower():
return self.get_author("Kaspersky")
if "f-secure" in title.lower():
return self.get_author("F-Secure")
if "checkpoint" in title.lower():
return self.get_author("CheckPoint")
if "talos" in title.lower():
return self.get_author("Cisco Talos")
if "secureworks" in title.lower():
return self.get_author("Dell SecureWorks")
if "microsoft" in title.lower():
return self.get_author("Microsoft")
if "mitre att&ck" in title.lower():
return self.get_author("The MITRE Corporation")
return None
[docs]
def get_author(self, name: str) -> Identity:
"""Get or create an author identity by name.
:param name: Name of the author organization
:type name: str
:return: Identity object for the author
:rtype: Identity
"""
name_in_cache = self.get_in_cache(name)
if name_in_cache is not None:
return name_in_cache
else:
author = self.opencti.identity.create(
type="Organization",
name=name,
description="",
)
self.set_in_cache(name, author)
return author
# Please use get_reader instead of this definition
[docs]
def get_readers(self):
"""Get a dictionary mapping entity types to their read methods.
:return: Dictionary mapping entity types to read functions
:rtype: dict
"""
return {
"Attack-Pattern": self.opencti.attack_pattern.read,
"Campaign": self.opencti.campaign.read,
"Case-Incident": self.opencti.case_incident.read,
"Case-Rfi": self.opencti.case_rfi.read,
"Case-Rft": self.opencti.case_rft.read,
"Channel": self.opencti.channel.read,
"Course-Of-Action": self.opencti.course_of_action.read,
"Data-Component": self.opencti.data_component.read,
"Data-Source": self.opencti.data_source.read,
"Event": self.opencti.event.read,
"External-Reference": self.opencti.external_reference.read,
"Feedback": self.opencti.feedback.read,
"Grouping": self.opencti.grouping.read,
"Incident": self.opencti.incident.read,
"Identity": self.opencti.identity.read,
"Indicator": self.opencti.indicator.read,
"Infrastructure": self.opencti.infrastructure.read,
"Intrusion-Set": self.opencti.intrusion_set.read,
"Kill-Chain-Phase": self.opencti.kill_chain_phase.read,
"Label": self.opencti.label.read,
"Location": self.opencti.location.read,
"Language": self.opencti.language.read,
"Malware": self.opencti.malware.read,
"Malware-Analysis": self.opencti.malware_analysis.read,
"Marking-Definition": self.opencti.marking_definition.read,
"Narrative": self.opencti.narrative.read,
"Note": self.opencti.note.read,
"Observed-Data": self.opencti.observed_data.read,
"Opinion": self.opencti.opinion.read,
"Report": self.opencti.report.read,
"Stix-Core-Object": self.opencti.stix_core_object.read,
"Stix-Cyber-Observable": self.opencti.stix_cyber_observable.read,
"Stix-Domain-Object": self.opencti.stix_domain_object.read,
"stix-core-relationship": self.opencti.stix_core_relationship.read,
"stix-sighting-relationship": self.opencti.stix_sighting_relationship.read,
"stix-nested-relationship": self.opencti.stix_nested_ref_relationship.read,
"Task": self.opencti.task.read,
"Threat-Actor": self.opencti.threat_actor.read,
"Threat-Actor-Group": self.opencti.threat_actor_group.read,
"Threat-Actor-Individual": self.opencti.threat_actor_individual.read,
"Tool": self.opencti.tool.read,
"Vocabulary": self.opencti.vocabulary.read,
"Vulnerability": self.opencti.vulnerability.read,
"Security-Coverage": self.opencti.security_coverage.read,
}
[docs]
def get_reader(self, entity_type: str):
"""Get the appropriate reader function for a given entity type.
:param entity_type: Type of the entity
:type entity_type: str
:return: Reader function for the entity type
:rtype: callable or None
"""
# Map types
if entity_type == "StixFile":
entity_type = "File"
if IdentityTypes.has_value(entity_type):
entity_type = "Identity"
if LocationTypes.has_value(entity_type):
entity_type = "Location"
if entity_type == "Container":
entity_type = "Stix-Domain-Object"
if StixCyberObservableTypes.has_value(entity_type):
entity_type = "Stix-Cyber-Observable"
readers = self.get_readers()
return readers.get(
entity_type, lambda **kwargs: self.unknown_type({"type": entity_type})
)
# endregion
[docs]
def get_stix_helper(self):
"""Get a dictionary mapping STIX types to their helper functions.
:return: Dictionary mapping STIX types to generate_id functions
:rtype: dict
"""
# Import
return {
# entities
"attack-pattern": self.opencti.attack_pattern,
"campaign": self.opencti.campaign,
"note": self.opencti.note,
"observed-data": self.opencti.observed_data,
"opinion": self.opencti.opinion,
"report": self.opencti.report,
"course-of-action": self.opencti.course_of_action,
"identity": self.opencti.identity,
"infrastructure": self.opencti.infrastructure,
"intrusion-set": self.opencti.intrusion_set,
"location": self.opencti.location,
"malware": self.opencti.malware,
"threat-actor": self.opencti.threat_actor,
"tool": self.opencti.tool,
"vulnerability": self.opencti.vulnerability,
"incident": self.opencti.incident,
"x-opencti-incident": self.opencti.incident,
"marking-definition": self.opencti.marking_definition,
"case-rfi": self.opencti.case_rfi,
"x-opencti-case-rfi": self.opencti.case_rfi,
"case-rft": self.opencti.case_rft,
"x-opencti-case-rft": self.opencti.case_rft,
"case-incident": self.opencti.case_incident,
"x-opencti-case-incident": self.opencti.case_incident,
"feedback": self.opencti.feedback,
"x-opencti-feedback": self.opencti.feedback,
"channel": self.opencti.channel,
"data-component": self.opencti.data_component,
"x-mitre-data-component": self.opencti.data_component,
"data-source": self.opencti.data_source,
"x-mitre-data-source": self.opencti.data_source,
"event": self.opencti.event,
"grouping": self.opencti.grouping,
"indicator": self.opencti.indicator,
"language": self.opencti.language,
"malware-analysis": self.opencti.malware_analysis,
"narrative": self.opencti.narrative,
"task": self.opencti.task,
"x-opencti-task": self.opencti.task,
"security-coverage": self.opencti.security_coverage,
"vocabulary": self.opencti.vocabulary,
# relationships
"relationship": self.opencti.stix_core_relationship,
"sighting": self.opencti.stix_sighting_relationship,
}
[docs]
def get_internal_helper(self):
"""Get a dictionary mapping internal types to their helper functions.
:return: Dictionary mapping internal types to generate_id functions
:rtype: dict
"""
# Import
return {
"user": self.opencti.user,
"group": self.opencti.group,
"capability": self.opencti.capability,
"role": self.opencti.role,
"settings": self.opencti.settings,
"work": self.opencti.work,
"deleteoperation": self.opencti.trash,
"draftworkspace": self.opencti.draft,
"playbook": self.opencti.playbook,
"workspace": self.opencti.workspace,
"publicdashboard": self.opencti.public_dashboard,
"notification": self.opencti.notification,
"internalfile": self.opencti.internal_file,
}
[docs]
def generate_standard_id_from_stix(self, data):
"""Generate a standard ID from STIX data.
:param data: STIX data dictionary
:type data: dict
:return: Generated standard ID or None
:rtype: str or None
"""
stix_helpers = self.get_stix_helper()
helper = stix_helpers.get(data["type"])
return helper.generate_id_from_data(data)
# region import
[docs]
def import_object(
self, stix_object: Dict, update: bool = False, types: List = None
) -> Optional[List]:
"""Import a STIX2 object into OpenCTI.
:param stix_object: Valid STIX2 object to import
:type stix_object: Dict
:param update: Whether to update data in the database, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: list, optional
:return: List of imported STIX2 objects or None on failure
:rtype: list or None
"""
self.opencti.app_logger.info(
"Importing an object",
{"type": stix_object["type"], "id": stix_object["id"]},
)
# Extract
embedded_relationships = self.extract_embedded_relationships(stix_object, types)
created_by_id = embedded_relationships["created_by"]
object_marking_ids = embedded_relationships["object_marking"]
object_label_ids = embedded_relationships["object_label"]
open_vocabs = embedded_relationships["open_vocabs"]
kill_chain_phases_ids = embedded_relationships["kill_chain_phases"]
object_refs_ids = embedded_relationships["object_refs"]
external_references_ids = embedded_relationships["external_references"]
reports = embedded_relationships["reports"]
sample_refs_ids = embedded_relationships["sample_refs"]
# Extract files
x_opencti_files = []
if "x_opencti_files" in stix_object:
x_opencti_files.extend(stix_object["x_opencti_files"])
if self.opencti.get_attribute_in_extension("files", stix_object) is not None:
x_opencti_files.extend(
self.opencti.get_attribute_in_extension("files", stix_object)
)
# Prepare all files for direct upload during creation
files_to_upload = []
files_markings = []
no_trigger_import = []
embedded_flags = []
for file_obj in x_opencti_files:
data = None
if "data" in file_obj:
data = base64.b64decode(file_obj["data"])
if data is not None:
files_to_upload.append(
self.opencti.file(
file_obj["name"],
data,
file_obj.get("mime_type", "application/octet-stream"),
)
)
files_markings.append(file_obj.get("object_marking_refs", None))
no_trigger_import.append(file_obj.get("no_trigger_import", False))
embedded_flags.append(file_obj.get("embedded", False))
# Extra
extras = {
"created_by_id": created_by_id,
"object_marking_ids": object_marking_ids,
"object_label_ids": object_label_ids,
"open_vocabs": open_vocabs,
"kill_chain_phases_ids": kill_chain_phases_ids,
"object_ids": object_refs_ids,
"external_references_ids": external_references_ids,
"reports": reports,
"sample_ids": sample_refs_ids,
"files": files_to_upload if files_to_upload else None,
"filesMarkings": files_markings if files_markings else None,
"noTriggerImport": no_trigger_import if no_trigger_import else None,
"embedded": embedded_flags if embedded_flags else None,
}
stix_helper = self.get_stix_helper().get(stix_object["type"])
if stix_helper:
stix_object_results = stix_helper.import_from_stix2(
stixObject=stix_object, extras=extras, update=update
)
else:
stix_object_results = None
self.opencti.app_logger.error(
"Unknown object type, doing nothing...", {"type": stix_object["type"]}
)
if stix_object_results is None:
return None
if not isinstance(stix_object_results, list):
stix_object_results = [stix_object_results]
for stix_object_result in stix_object_results:
self.set_in_cache(
stix_object["id"],
{
"id": stix_object_result["id"],
"type": stix_object_result["entity_type"],
"observables": (
stix_object_result["observables"]
if "observables" in stix_object_result
else []
),
},
)
self.set_in_cache(
stix_object_result["id"],
{
"id": stix_object_result["id"],
"type": stix_object_result["entity_type"],
"observables": (
stix_object_result["observables"]
if "observables" in stix_object_result
else []
),
},
)
# Add reports from external references
for external_reference_id in external_references_ids:
if external_reference_id in reports:
self.opencti.report.add_stix_object_or_stix_relationship(
id=reports[external_reference_id]["id"],
stixObjectOrStixRelationshipId=stix_object_result["id"],
)
return stix_object_results
[docs]
def import_observable(
self, stix_object: Dict, update: bool = False, types: List = None
) -> None:
"""Import a STIX cyber observable into OpenCTI.
:param stix_object: Valid STIX2 cyber observable object
:type stix_object: Dict
:param update: Whether to update existing data in the database, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: list, optional
"""
# Extract
embedded_relationships = self.extract_embedded_relationships(stix_object, types)
created_by_id = embedded_relationships["created_by"]
object_marking_ids = embedded_relationships["object_marking"]
object_label_ids = embedded_relationships["object_label"]
open_vocabs = embedded_relationships["open_vocabs"]
granted_refs_ids = embedded_relationships["granted_refs"]
kill_chain_phases_ids = embedded_relationships["kill_chain_phases"]
object_refs_ids = embedded_relationships["object_refs"]
external_references_ids = embedded_relationships["external_references"]
reports = embedded_relationships["reports"]
sample_refs_ids = embedded_relationships["sample_refs"]
# Extract files
x_opencti_files = []
if "x_opencti_files" in stix_object:
x_opencti_files.extend(stix_object["x_opencti_files"])
if self.opencti.get_attribute_in_extension("files", stix_object) is not None:
x_opencti_files.extend(
self.opencti.get_attribute_in_extension("files", stix_object)
)
# Prepare all files for direct upload during creation
files_to_upload = []
files_markings = []
no_trigger_import = []
embedded_flags = []
for file_obj in x_opencti_files:
data = None
if "data" in file_obj:
data = base64.b64decode(file_obj["data"])
if data is not None:
files_to_upload.append(
self.opencti.file(
file_obj["name"],
data,
file_obj.get("mime_type", "application/octet-stream"),
)
)
files_markings.append(file_obj.get("object_marking_refs", None))
no_trigger_import.append(file_obj.get("no_trigger_import", False))
embedded_flags.append(file_obj.get("embedded", False))
# Extra
extras = {
"created_by_id": created_by_id,
"object_marking_ids": object_marking_ids,
"object_label_ids": object_label_ids,
"open_vocabs": open_vocabs,
"granted_refs_ids": granted_refs_ids,
"kill_chain_phases_ids": kill_chain_phases_ids,
"object_ids": object_refs_ids,
"external_references_ids": external_references_ids,
"reports": reports,
"sample_ids": sample_refs_ids,
"files": files_to_upload if files_to_upload else None,
"filesMarkings": files_markings if files_markings else None,
"noTriggerImport": no_trigger_import if no_trigger_import else None,
"embedded": embedded_flags if embedded_flags else None,
}
upsert_operations = self.opencti.get_attribute_in_extension(
"opencti_upsert_operations", stix_object
)
if stix_object["type"] == "simple-observable":
stix_observable_result = self.opencti.stix_cyber_observable.create(
simple_observable_id=stix_object["id"],
simple_observable_key=stix_object["key"],
simple_observable_value=(
stix_object["value"]
if stix_object["key"] not in OBSERVABLES_VALUE_INT
else int(stix_object["value"])
),
simple_observable_description=(
stix_object["description"] if "description" in stix_object else None
),
x_opencti_score=(
stix_object["x_opencti_score"]
if "x_opencti_score" in stix_object
else None
),
createdBy=(
extras["created_by_id"] if "created_by_id" in extras else None
),
objectMarking=(
extras["object_marking_ids"]
if "object_marking_ids" in extras
else []
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
createIndicator=(
stix_object["x_opencti_create_indicator"]
if "x_opencti_create_indicator" in stix_object
else None
),
objectOrganization=(
extras["granted_refs_ids"] if "granted_refs_ids" in extras else []
),
update=update,
files=extras.get("files"),
filesMarkings=extras.get("filesMarkings"),
noTriggerImport=extras.get("noTriggerImport", False),
embedded=extras.get("embedded", None),
upsert_operations=upsert_operations,
)
else:
stix_observable_result = self.opencti.stix_cyber_observable.create(
observableData=stix_object,
createdBy=(
extras["created_by_id"] if "created_by_id" in extras else None
),
objectMarking=(
extras["object_marking_ids"]
if "object_marking_ids" in extras
else []
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
objectOrganization=(
extras["granted_refs_ids"] if "granted_refs_ids" in extras else []
),
update=update,
files=extras.get("files"),
filesMarkings=extras.get("filesMarkings"),
noTriggerImport=extras.get("noTriggerImport", False),
embedded=extras.get("embedded", None),
upsert_operations=upsert_operations,
)
if stix_observable_result is not None:
if "id" in stix_object:
self.set_in_cache(
stix_object["id"],
{
"id": stix_observable_result["id"],
"type": stix_observable_result["entity_type"],
},
)
self.set_in_cache(
stix_observable_result["id"],
{
"id": stix_observable_result["id"],
"type": stix_observable_result["entity_type"],
},
)
# Iterate over refs to create appropriate relationships
for key in stix_object.keys():
if key not in [
"created_by_ref",
"object_marking_refs",
"x_opencti_created_by_ref",
"x_opencti_granted_refs",
"src_ref",
"dst_ref",
]:
if key.endswith("_ref"):
relationship_type = key.replace("_ref", "")
if relationship_type.startswith("x_opencti_"):
relationship_type = relationship_type.split(
"x_opencti_", 1
)[1]
relationship_type = relationship_type.replace("_", "-")
relationship_type = "x_opencti_" + relationship_type
else:
relationship_type = relationship_type.replace("_", "-")
self.opencti.stix_nested_ref_relationship.create(
fromId=stix_observable_result["id"],
toId=stix_object[key],
relationship_type=relationship_type,
)
elif key.endswith("_refs"):
relationship_type = key.replace("_refs", "")
if relationship_type.startswith("x_opencti_"):
relationship_type = relationship_type.split(
"x_opencti_", 1
)[1]
relationship_type = relationship_type.replace("_", "-")
relationship_type = "x_opencti_" + relationship_type
else:
relationship_type = relationship_type.replace("_", "-")
for value in stix_object[key]:
self.opencti.stix_nested_ref_relationship.create(
fromId=stix_observable_result["id"],
toId=value,
relationship_type=relationship_type,
)
else:
return None
[docs]
def import_relationship(
self, stix_relation: Dict, update: bool = False, types: List = None
) -> None:
"""Import a STIX core relationship into OpenCTI.
:param stix_relation: Valid STIX2 relationship object
:type stix_relation: Dict
:param update: Whether to update existing data in the database, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: list, optional
"""
# Extract
embedded_relationships = self.extract_embedded_relationships(
stix_relation, types
)
created_by_id = embedded_relationships["created_by"]
object_marking_ids = embedded_relationships["object_marking"]
object_label_ids = embedded_relationships["object_label"]
open_vocabs = embedded_relationships["open_vocabs"]
granted_refs_ids = embedded_relationships["granted_refs"]
kill_chain_phases_ids = embedded_relationships["kill_chain_phases"]
object_refs_ids = embedded_relationships["object_refs"]
external_references_ids = embedded_relationships["external_references"]
reports = embedded_relationships["reports"]
sample_refs_ids = embedded_relationships["sample_refs"]
# Extra
extras = {
"created_by_id": created_by_id,
"object_marking_ids": object_marking_ids,
"object_label_ids": object_label_ids,
"open_vocabs": open_vocabs,
"granted_refs_ids": granted_refs_ids,
"kill_chain_phases_ids": kill_chain_phases_ids,
"object_ids": object_refs_ids,
"external_references_ids": external_references_ids,
"reports": reports,
"sample_ids": sample_refs_ids,
}
# Create the relation
# Try to guess start_time / stop_time from external reference
date = None
if "external_references" in stix_relation:
for external_reference in stix_relation["external_references"]:
try:
if "description" in external_reference:
matches = datefinder.find_dates(
external_reference["description"],
base_date=datetime.datetime.fromtimestamp(0),
)
else:
matches = datefinder.find_dates(
external_reference["source_name"],
base_date=datetime.datetime.fromtimestamp(0),
)
except (TypeError, OverflowError):
matches = None
date = None
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
if matches is not None:
try:
for match in matches:
if (
match.timestamp() < yesterday.timestamp()
and len(str(match.year)) == 4
):
date = match.strftime("%Y-%m-%dT%H:%M:%SZ")
break
except (TypeError, OverflowError):
date = None
stix_relation_result = self.opencti.stix_core_relationship.import_from_stix2(
stixRelation=stix_relation, extras=extras, update=update, defaultDate=date
)
if stix_relation_result is not None:
self.set_in_cache(
stix_relation["id"],
{
"id": stix_relation_result["id"],
"type": stix_relation_result["entity_type"],
},
)
else:
return None
# Add external references
for external_reference_id in external_references_ids:
if external_reference_id in reports:
self.opencti.report.add_stix_object_or_stix_relationship(
id=reports[external_reference_id]["id"],
stixObjectOrStixRelationshipId=stix_relation_result["id"],
)
self.opencti.report.add_stix_object_or_stix_relationship(
id=reports[external_reference_id]["id"],
stixObjectOrStixRelationshipId=stix_relation["source_ref"],
)
self.opencti.report.add_stix_object_or_stix_relationship(
id=reports[external_reference_id]["id"],
stixObjectOrStixRelationshipId=stix_relation["target_ref"],
)
[docs]
def import_sighting(
self,
stix_sighting: Dict,
from_id: str,
to_id: str,
update: bool = False,
types: List = None,
) -> None:
"""Import a STIX sighting relationship into OpenCTI.
:param stix_sighting: Valid STIX2 sighting object
:type stix_sighting: Dict
:param from_id: ID of the source entity (sighting_of_ref)
:type from_id: str
:param to_id: ID of the target entity (where_sighted_ref)
:type to_id: str
:param update: Whether to update existing data in the database, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: list, optional
"""
# Extract
embedded_relationships = self.extract_embedded_relationships(
stix_sighting, types
)
created_by_id = embedded_relationships["created_by"]
object_marking_ids = embedded_relationships["object_marking"]
object_label_ids = embedded_relationships["object_label"]
open_vocabs = embedded_relationships["open_vocabs"]
granted_refs_ids = embedded_relationships["granted_refs"]
kill_chain_phases_ids = embedded_relationships["kill_chain_phases"]
object_refs_ids = embedded_relationships["object_refs"]
external_references_ids = embedded_relationships["external_references"]
reports = embedded_relationships["reports"]
sample_refs_ids = embedded_relationships["sample_refs"]
# Extra
extras = {
"created_by_id": created_by_id,
"object_marking_ids": object_marking_ids,
"object_label_ids": object_label_ids,
"open_vocabs": open_vocabs,
"granted_refs_ids": granted_refs_ids,
"kill_chain_phases_ids": kill_chain_phases_ids,
"object_ids": object_refs_ids,
"external_references_ids": external_references_ids,
"reports": reports,
"sample_ids": sample_refs_ids,
}
# Create the sighting
upsert_operations = self.opencti.get_attribute_in_extension(
"opencti_upsert_operations", stix_sighting
)
if (
"x_opencti_negative" not in stix_sighting
and self.opencti.get_attribute_in_extension("negative", stix_sighting)
is not None
):
stix_sighting["x_opencti_negative"] = (
self.opencti.get_attribute_in_extension("negative", stix_sighting)
)
if "x_opencti_workflow_id" not in stix_sighting:
stix_sighting["x_opencti_workflow_id"] = (
self.opencti.get_attribute_in_extension("workflow_id", stix_sighting)
)
stix_sighting_result = self.opencti.stix_sighting_relationship.create(
fromId=from_id,
toId=to_id,
stix_id=stix_sighting["id"] if "id" in stix_sighting else None,
description=(
self.convert_markdown(stix_sighting["description"])
if "description" in stix_sighting
else None
),
first_seen=(
stix_sighting["first_seen"] if "first_seen" in stix_sighting else None
),
last_seen=(
stix_sighting["last_seen"] if "last_seen" in stix_sighting else None
),
count=stix_sighting["count"] if "count" in stix_sighting else 1,
x_opencti_negative=(
stix_sighting["x_opencti_negative"]
if "x_opencti_negative" in stix_sighting
else False
),
created=stix_sighting["created"] if "created" in stix_sighting else None,
modified=stix_sighting["modified"] if "modified" in stix_sighting else None,
confidence=(
stix_sighting["confidence"] if "confidence" in stix_sighting else None
),
createdBy=extras["created_by_id"] if "created_by_id" in extras else None,
objectMarking=(
extras["object_marking_ids"] if "object_marking_ids" in extras else []
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
objectOrganization=(
extras["granted_refs_ids"] if "granted_refs_ids" in extras else []
),
x_opencti_workflow_id=(
stix_sighting["x_opencti_workflow_id"]
if "x_opencti_workflow_id" in stix_sighting
else None
),
x_opencti_stix_ids=(
stix_sighting["x_opencti_stix_ids"]
if "x_opencti_stix_ids" in stix_sighting
else None
),
update=update,
ignore_dates=(
stix_sighting["x_opencti_ignore_dates"]
if "x_opencti_ignore_dates" in stix_sighting
else None
),
upsert_operations=upsert_operations,
)
if stix_sighting_result is not None:
self.set_in_cache(
stix_sighting["id"],
{
"id": stix_sighting_result["id"],
"type": stix_sighting_result["entity_type"],
},
)
else:
return None
# endregion
# region export
[docs]
def generate_export(self, entity: Dict, no_custom_attributes: bool = False) -> Dict:
"""Generate a STIX2 export from an OpenCTI entity.
:param entity: OpenCTI entity dictionary to export
:type entity: Dict
:param no_custom_attributes: Whether to exclude custom x_opencti attributes, defaults to False
:type no_custom_attributes: bool, optional
:return: STIX2 formatted entity dictionary
:rtype: Dict
"""
# Handle model deviation
original_entity_type = entity["entity_type"]
# Identities
if IdentityTypes.has_value(entity["entity_type"]):
entity["entity_type"] = "Identity"
# Threat-Actors
if ThreatActorTypes.has_value(entity["entity_type"]):
if not no_custom_attributes:
entity["x_opencti_type"] = entity["entity_type"]
if entity["entity_type"] == "Threat-Actor-Group":
entity["threat_actor_group"] = entity["name"]
entity["entity_type"] = "Threat-Actor"
# Locations
if LocationTypes.has_value(entity["entity_type"]):
if not no_custom_attributes:
entity["x_opencti_location_type"] = entity["entity_type"]
if entity["entity_type"] == "City":
entity["city"] = entity["name"]
elif entity["entity_type"] == "Country":
entity["country"] = entity["name"]
elif entity["entity_type"] == "Region":
entity["region"] = entity["name"]
entity["entity_type"] = "Location"
# Malware
if entity["entity_type"] == "Malware":
if "is_family" not in entity or not isinstance(entity["is_family"], bool):
entity["is_family"] = True
# Threat Actor
if entity["entity_type"] == "Threat-Actor-Group":
entity["entity_type"] = "Threat-Actor"
if entity["entity_type"] == "Threat-Actor-Individual":
entity["entity_type"] = "Threat-Actor"
# Files
if entity["entity_type"] == "StixFile":
entity["entity_type"] = "File"
# Case Incident
if entity["entity_type"] == "Case-Incident":
entity["standard_id"] = "x-opencti-" + entity["standard_id"]
entity["entity_type"] = "x-opencti-" + entity["entity_type"]
# Case RFI
if entity["entity_type"] == "Case-Rfi":
entity["standard_id"] = "x-opencti-" + entity["standard_id"]
entity["entity_type"] = "x-opencti-" + entity["entity_type"]
# Case RFT
if entity["entity_type"] == "Case-Rft":
entity["standard_id"] = "x-opencti-" + entity["standard_id"]
entity["entity_type"] = "x-opencti-" + entity["entity_type"]
# Feedback
if entity["entity_type"] == "Feedback":
entity["standard_id"] = "x-opencti-" + entity["standard_id"]
entity["entity_type"] = "x-opencti-" + entity["entity_type"]
# Task
if entity["entity_type"] == "Task":
entity["standard_id"] = "x-opencti-" + entity["standard_id"]
entity["entity_type"] = "x-opencti-" + entity["entity_type"]
# Data component
if entity["entity_type"] == "Data-Component":
entity["standard_id"] = "x-mitre-" + entity["standard_id"]
entity["entity_type"] = "x-mitre-" + entity["entity_type"]
# Data source
if entity["entity_type"] == "Data-Source":
entity["standard_id"] = "x-mitre-" + entity["standard_id"]
entity["entity_type"] = "x-mitre-" + entity["entity_type"]
if "platforms" in entity and entity["platforms"] is not None:
entity["x_mitre_platforms"] = entity["platforms"]
del entity["platforms"]
if (
"collection_layers" in entity
and entity["collection_layers"] is not None
):
entity["x_mitre_collection_layers"] = entity["collection_layers"]
del entity["collection_layers"]
# Dates
if (
"valid_from" in entity
and "valid_until" in entity
and entity["valid_from"] == entity["valid_until"]
):
valid_until_converted_datetime = datetime.datetime.strptime(
entity["valid_until"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
new_valid_until = valid_until_converted_datetime + datetime.timedelta(
seconds=1
)
valid_until_converted_string = new_valid_until.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
entity["valid_until"] = valid_until_converted_string
# Flatten
if "tasks" in entity:
del entity["tasks"]
if "status" in entity and entity["status"] is not None:
entity["x_opencti_workflow_id"] = entity["status"].get("id")
if "status" in entity:
del entity["status"]
if "objectLabel" in entity and len(entity["objectLabel"]) > 0:
entity["labels"] = []
for object_label in entity["objectLabel"]:
entity["labels"].append(object_label["value"])
if "objectLabel" in entity:
del entity["objectLabel"]
del entity["objectLabelIds"]
if (
not no_custom_attributes
and "killChainPhases" in entity
and len(entity["killChainPhases"]) > 0
):
entity["kill_chain_phases"] = []
for object_kill_chain_phase in entity["killChainPhases"]:
kill_chain_phase = {
"kill_chain_name": object_kill_chain_phase["kill_chain_name"],
"phase_name": object_kill_chain_phase["phase_name"],
"x_opencti_order": object_kill_chain_phase["x_opencti_order"],
}
entity["kill_chain_phases"].append(kill_chain_phase)
if "killChainPhases" in entity:
del entity["killChainPhases"]
del entity["killChainPhasesIds"]
if (
not no_custom_attributes
and "externalReferences" in entity
and len(entity["externalReferences"]) > 0
):
entity["external_references"] = []
for entity_external_reference in entity["externalReferences"]:
external_reference = dict()
if self.opencti.not_empty(entity_external_reference["source_name"]):
external_reference["source_name"] = entity_external_reference[
"source_name"
]
if self.opencti.not_empty(entity_external_reference["description"]):
external_reference["description"] = entity_external_reference[
"description"
]
if self.opencti.not_empty(entity_external_reference["url"]):
external_reference["url"] = entity_external_reference["url"]
if self.opencti.not_empty(entity_external_reference["hash"]):
external_reference["hash"] = entity_external_reference["hash"]
if self.opencti.not_empty(entity_external_reference["external_id"]):
external_reference["external_id"] = entity_external_reference[
"external_id"
]
if (
"importFiles" in entity_external_reference
and len(entity_external_reference["importFiles"]) > 0
):
external_reference["x_opencti_files"] = []
for file in entity_external_reference["importFiles"]:
url = (
self.opencti.api_url.replace("graphql", "storage/get/")
+ file["id"]
)
data = self.opencti.fetch_opencti_file(
url, binary=True, serialize=True
)
external_reference["x_opencti_files"].append(
{
"name": file["name"],
"data": data,
"mime_type": file["metaData"]["mimetype"],
"version": file["metaData"].get("version", None),
}
)
entity["external_references"].append(external_reference)
if "externalReferences" in entity:
del entity["externalReferences"]
del entity["externalReferencesIds"]
if "indicators" in entity:
del entity["indicators"]
del entity["indicatorsIds"]
if "hashes" in entity:
hashes = entity["hashes"]
entity["hashes"] = {}
for hash_item in hashes:
entity["hashes"][hash_item["algorithm"]] = hash_item["hash"]
# Final
entity["x_opencti_id"] = entity["id"]
if not no_custom_attributes:
entity["x_opencti_type"] = original_entity_type
entity["id"] = entity["standard_id"]
entity["type"] = entity["entity_type"].lower()
del entity["standard_id"]
del entity["entity_type"]
del entity["parent_types"]
if "created_at" in entity:
del entity["created_at"]
if "updated_at" in entity:
del entity["updated_at"]
return {k: v for k, v in entity.items() if self.opencti.not_empty(v)}
@staticmethod
[docs]
def prepare_id_filters_export(
entity_id: Union[str, List[str]], access_filter: Dict = None
) -> Dict:
"""Prepare filter configuration for entity ID-based export queries.
:param entity_id: Single entity ID or list of entity IDs to filter
:type entity_id: Union[str, List[str]]
:param access_filter: Additional access filter to combine, defaults to None
:type access_filter: Dict, optional
:return: Filter configuration dictionary for API queries
:rtype: Dict
"""
if access_filter is not None:
return {
"mode": "and",
"filterGroups": [
{
"mode": "or",
"filters": [
{
"key": "ids",
"values": (
entity_id
if isinstance(entity_id, list)
else [entity_id]
),
}
],
"filterGroups": [],
},
access_filter,
],
"filters": [],
}
else:
return {
"mode": "and",
"filterGroups": [],
"filters": [
{
"key": "ids",
"mode": "or",
"values": (
entity_id if isinstance(entity_id, list) else [entity_id]
),
}
],
}
[docs]
def prepare_export(
self,
entity: Dict,
mode: str = "simple",
access_filter: Dict = None,
no_custom_attributes: bool = False,
) -> List:
"""Prepare an entity for STIX2 export with related objects.
:param entity: Entity dictionary to prepare for export
:type entity: Dict
:param mode: Export mode - 'simple' for entity only, 'full' for entity with relations
:type mode: str
:param access_filter: Access filter for the export, defaults to None
:type access_filter: Dict, optional
:param no_custom_attributes: Whether to exclude custom attributes, defaults to False
:type no_custom_attributes: bool, optional
:return: List of STIX2 objects ready for export
:rtype: List
"""
result = []
objects_to_get = []
self._rewrite_embedded_image_uris_for_export(entity)
# CreatedByRef
if (
not no_custom_attributes
and "createdBy" in entity
and entity["createdBy"] is not None
):
created_by = self.generate_export(entity=entity["createdBy"])
if entity["type"] in STIX_CYBER_OBSERVABLE_MAPPING:
entity["x_opencti_created_by_ref"] = created_by["id"]
else:
entity["created_by_ref"] = created_by["id"]
result.append(created_by)
# Labels
if entity["type"] in STIX_CYBER_OBSERVABLE_MAPPING and "labels" in entity:
entity["x_opencti_labels"] = entity["labels"]
del entity["labels"]
if "createdBy" in entity:
del entity["createdBy"]
del entity["createdById"]
if "observables" in entity:
del entity["observables"]
del entity["observablesIds"]
if "creators" in entity:
del entity["creators"]
# DataSource
if (
not no_custom_attributes
and "dataSource" in entity
and entity["dataSource"] is not None
):
data_source = self.generate_export(entity["dataSource"])
entity["x_mitre_data_source_ref"] = data_source["id"]
result.append(data_source)
if "dataSource" in entity:
del entity["dataSource"]
del entity["dataSourceId"]
# Dates
if "first_seen" in entity and entity["first_seen"].startswith("1970"):
del entity["first_seen"]
if "start_time" in entity and entity["start_time"].startswith("1970"):
del entity["start_time"]
if "last_seen" in entity and entity["last_seen"].startswith("5138"):
del entity["last_seen"]
if "stop_time" in entity and entity["stop_time"].startswith("5138"):
del entity["stop_time"]
entity_copy = entity.copy()
if no_custom_attributes:
if "external_references" in entity:
del entity["external_references"]
for key in entity_copy.keys():
if key.startswith("x_"):
del entity[key]
entity["x_opencti_id"] = entity_copy["x_opencti_id"]
# ObjectOrganization
if (
not no_custom_attributes
and "objectOrganization" in entity
and len(entity["objectOrganization"]) > 0
):
entity["x_opencti_granted_refs"] = []
for entity_organization in entity["objectOrganization"]:
entity["x_opencti_granted_refs"].append(
entity_organization["standard_id"]
)
if "objectOrganization" in entity:
del entity["objectOrganization"]
# ObjectMarkingRefs
if (
not no_custom_attributes
and "objectMarking" in entity
and len(entity["objectMarking"]) > 0
):
entity["object_marking_refs"] = []
for entity_marking_definition in entity["objectMarking"]:
if entity_marking_definition["definition_type"] == "TLP":
created = "2017-01-20T00:00:00.000Z"
else:
created = entity_marking_definition["created"]
marking_definition = {
"type": "marking-definition",
"spec_version": SPEC_VERSION,
"id": entity_marking_definition["standard_id"],
"created": created,
"definition_type": entity_marking_definition[
"definition_type"
].lower(),
"name": entity_marking_definition["definition"],
"definition": {
entity_marking_definition["definition_type"]
.lower(): entity_marking_definition["definition"]
.lower()
.replace("tlp:", "")
},
}
result.append(marking_definition)
entity["object_marking_refs"].append(marking_definition["id"])
if "objectMarking" in entity:
del entity["objectMarking"]
del entity["objectMarkingIds"]
# ObjectRefs
if (
not no_custom_attributes
and "objects" in entity
and len(entity["objects"]) > 0
):
entity["object_refs"] = []
objects_to_get = entity["objects"] # To do differently
for entity_object in entity["objects"]:
if (
entity["type"] == "report"
and entity_object["entity_type"]
not in [
"Note",
"Report",
"Opinion",
]
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "note"
and entity_object["entity_type"]
not in [
"Note",
"Opinion",
]
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "opinion"
and entity_object["entity_type"] not in ["Opinion"]
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "observed-data"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "grouping"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "x-opencti-case-incident"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "x-opencti-feedback"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "x-opencti-case-rfi"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "x-opencti-case-rft"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
elif (
entity["type"] == "x-opencti-task"
and "stix-ref-relationship" not in entity_object["parent_types"]
):
entity["object_refs"].append(entity_object["standard_id"])
if "objects" in entity:
del entity["objects"]
del entity["objectsIds"]
# Stix Sighting Relationship
if entity["type"] == "stix-sighting-relationship":
entity["type"] = "sighting"
entity["count"] = entity["attribute_count"]
del entity["attribute_count"]
from_to_check = entity["from"]["id"]
relationships_from_filter = self.prepare_id_filters_export(
entity_id=from_to_check, access_filter=access_filter
)
x = self.opencti.opencti_stix_object_or_stix_relationship.list(
filters=relationships_from_filter
)
if len(x) > 0:
entity["sighting_of_ref"] = entity["from"]["standard_id"]
# handle from and to separately like Stix Core Relationship and call 2 requests
objects_to_get.append(
entity["from"]
) # what happen with unauthorized objects ?
to_to_check = [entity["to"]["id"]]
relationships_to_filter = self.prepare_id_filters_export(
entity_id=to_to_check, access_filter=access_filter
)
y = self.opencti.opencti_stix_object_or_stix_relationship.list(
filters=relationships_to_filter
)
if len(y) > 0:
entity["where_sighted_refs"] = [entity["to"]["standard_id"]]
objects_to_get.append(entity["to"])
del entity["from"]
del entity["to"]
# Stix Core Relationship
if "from" in entity or "to" in entity:
entity["type"] = "relationship"
if "from" in entity:
from_to_check = entity["from"]["id"]
relationships_from_filter = self.prepare_id_filters_export(
entity_id=from_to_check, access_filter=access_filter
)
x = self.opencti.opencti_stix_object_or_stix_relationship.list(
filters=relationships_from_filter
)
if len(x) > 0:
entity["source_ref"] = entity["from"]["standard_id"]
# handle from and to separately like Stix Core Relationship and call 2 requests
objects_to_get.append(
entity["from"]
) # what happen with unauthorized objects ?
del entity["from"]
if "to" in entity:
to_to_check = [entity["to"]["id"]]
relationships_to_filter = self.prepare_id_filters_export(
entity_id=to_to_check, access_filter=access_filter
)
y = self.opencti.opencti_stix_object_or_stix_relationship.list(
filters=relationships_to_filter
)
if len(y) > 0:
entity["target_ref"] = entity["to"]["standard_id"]
objects_to_get.append(entity["to"])
del entity["to"]
# Stix Domain Object
if "attribute_abstract" in entity:
entity["abstract"] = entity["attribute_abstract"]
del entity["attribute_abstract"]
# Stix Cyber Observable
if "observable_value" in entity:
del entity["observable_value"]
if "attribute_key" in entity:
entity["key"] = entity["attribute_key"]
del entity["attribute_key"]
if "attribute_date" in entity:
entity["date"] = entity["attribute_date"]
del entity["attribute_date"]
# Artifact
if entity["type"] == "artifact" and "importFiles" in entity:
first_file = entity["importFiles"][0]["id"]
url = self.opencti.api_url.replace("graphql", "storage/get/") + first_file
file = self.opencti.fetch_opencti_file(url, binary=True, serialize=True)
if file:
entity["payload_bin"] = file
# Files
if "importFiles" in entity and len(entity["importFiles"]) > 0:
entity["x_opencti_files"] = []
for file in entity["importFiles"]:
url = (
self.opencti.api_url.replace("graphql", "storage/get/") + file["id"]
)
data = self.opencti.fetch_opencti_file(url, binary=True, serialize=True)
x_opencti_file = {
"name": file["name"],
"data": data,
"mime_type": file["metaData"]["mimetype"],
"version": file["metaData"].get("version", None),
"object_marking_refs": [],
}
for file_marking_definition in file.get("objectMarking", []):
if file_marking_definition["definition_type"] == "TLP":
created = "2017-01-20T00:00:00.000Z"
else:
created = file_marking_definition["created"]
marking_definition = {
"type": "marking-definition",
"spec_version": SPEC_VERSION,
"id": file_marking_definition["standard_id"],
"created": created,
"definition_type": file_marking_definition[
"definition_type"
].lower(),
"name": file_marking_definition["definition"],
"definition": {
file_marking_definition["definition_type"]
.lower(): file_marking_definition["definition"]
.lower()
.replace("tlp:", "")
},
}
result.append(marking_definition)
x_opencti_file["object_marking_refs"].append(
marking_definition["id"]
)
entity["x_opencti_files"].append(x_opencti_file)
del entity["importFiles"]
del entity["importFilesIds"]
# StixRefRelationship
stix_nested_ref_relationships = self.opencti.stix_nested_ref_relationship.list(
fromId=entity["x_opencti_id"], filters=access_filter
)
for stix_nested_ref_relationship in stix_nested_ref_relationships:
if "standard_id" in stix_nested_ref_relationship["to"]:
# dirty fix because the sample and operating-system ref are not multiple for a Malware Analysis
# will be replaced by a proper toStix converter in the back
if not MultipleRefRelationship.has_value(
stix_nested_ref_relationship["relationship_type"]
) or (
entity["type"] == "malware-analysis"
and stix_nested_ref_relationship["relationship_type"]
in ["operating-system", "sample"]
):
key = (
stix_nested_ref_relationship["relationship_type"]
.replace("obs_", "")
.replace("-", "_")
+ "_ref"
)
entity[key] = stix_nested_ref_relationship["to"]["standard_id"]
else:
key = (
stix_nested_ref_relationship["relationship_type"]
.replace("obs_", "")
.replace("-", "_")
+ "_refs"
)
if key in entity and isinstance(entity[key], list):
entity[key].append(
stix_nested_ref_relationship["to"]["standard_id"]
)
else:
entity[key] = [
stix_nested_ref_relationship["to"]["standard_id"]
]
result.append(entity)
if mode == "simple":
if no_custom_attributes:
del entity["x_opencti_id"]
return result
elif mode == "full":
uuids = [entity["id"]]
for y in result:
uuids.append(y["id"])
# Get extra refs
for key in entity.keys():
if key.endswith("_ref"):
stix_type = entity[key].split("--")[0]
if stix_type in STIX_CYBER_OBSERVABLE_MAPPING:
objects_to_get.append(
{
"id": entity[key],
"entity_type": "Stix-Cyber-Observable",
"parent_types": ["Stix-Cyber-Observable"],
}
)
else:
objects_to_get.append(
{
"id": entity[key],
"entity_type": "Stix-Domain-Object",
"parent_types": ["Stix-Domain-Object"],
}
)
elif key.endswith("_refs"):
for value in entity[key]:
stix_type = value.split("--")[0]
if stix_type in STIX_CYBER_OBSERVABLE_MAPPING:
objects_to_get.append(
{
"id": value,
"entity_type": "Stix-Cyber-Observable",
"parent_types": ["Stix-Cyber-Observable"],
}
)
else:
objects_to_get.append(
{
"id": value,
"entity_type": "Stix-Domain-Object",
"parent_types": ["Stix-Domain-Object"],
}
)
# Get extra relations (from AND to)
stix_core_relationships = self.opencti.stix_core_relationship.list(
fromOrToId=entity["x_opencti_id"], getAll=True, filters=access_filter
)
for stix_core_relationship in stix_core_relationships:
objects_to_get.append(
stix_core_relationship["to"]
if stix_core_relationship["to"]["id"] != entity["x_opencti_id"]
else stix_core_relationship["from"]
)
relation_object_data = (
self.prepare_export( # ICI -> remove max marking ?
entity=self.generate_export(stix_core_relationship),
mode="simple",
access_filter=access_filter,
)
)
relation_object_bundle = self.filter_objects(
uuids, relation_object_data
)
uuids = uuids + [x["id"] for x in relation_object_bundle]
result = result + relation_object_bundle
# Get sighting
stix_sighting_relationships = self.opencti.stix_sighting_relationship.list(
fromOrToId=entity["x_opencti_id"], getAll=True, filters=access_filter
)
for stix_sighting_relationship in stix_sighting_relationships:
objects_to_get.append(
stix_sighting_relationship["to"]
if stix_sighting_relationship["to"]["id"] != entity["x_opencti_id"]
else stix_sighting_relationship["from"]
)
relation_object_data = (
self.prepare_export( # ICI -> remove max marking ?
entity=self.generate_export(stix_sighting_relationship),
mode="simple",
access_filter=access_filter,
)
)
relation_object_bundle = self.filter_objects(
uuids, relation_object_data
)
uuids = uuids + [x["id"] for x in relation_object_bundle]
result = result + relation_object_bundle
if no_custom_attributes:
del entity["x_opencti_id"]
# Get extra objects
for entity_object in objects_to_get:
resolve_type = entity_object["entity_type"]
if "stix-core-relationship" in entity_object["parent_types"]:
resolve_type = "stix-core-relationship"
if "stix-ref-relationship" in entity_object["parent_types"]:
resolve_type = "stix-ref-relationship"
do_read = self.get_reader(resolve_type)
query_filters = self.prepare_id_filters_export(
entity_object["id"], access_filter
)
entity_object_data = do_read(filters=query_filters)
if entity_object_data is not None:
stix_entity_object = self.prepare_export(
entity=self.generate_export(entity_object_data),
mode="simple",
access_filter=access_filter,
)
# Add to result
entity_object_bundle = self.filter_objects(
uuids, stix_entity_object
)
uuids = uuids + [x["id"] for x in entity_object_bundle]
result = result + entity_object_bundle
# Get extra reports
"""
for uuid in uuids:
if "marking-definition" not in uuid:
reports = self.opencti.opencti_stix_object_or_stix_relationship.reports(id=uuid)
for report in reports:
report_object_data = self.opencti.report.to_stix2(
entity=report,
mode="simple",
)
report_object_bundle = self.filter_objects(
uuids, report_object_data
)
uuids = uuids + [x["id"] for x in report_object_bundle]
result = result + report_object_bundle
"""
# Get notes
# for export_uuid in uuids:
# if "marking-definition" not in export_uuid:
# notes = self.opencti.opencti_stix_object_or_stix_relationship.notes(
# id=export_uuid
# )
# for note in notes:
# note_object_data = self.opencti.note.to_stix2(
# entity=note,
# mode="simple",
# )
# note_object_bundle = self.filter_objects(
# uuids, note_object_data
# )
# uuids = uuids + [x["id"] for x in note_object_bundle]
# result = result + note_object_bundle
# Refilter all the reports object refs
final_result = []
for result_entity in result:
if result_entity["type"] in [
"report",
"note",
"opinion",
"observed-data",
"grouping",
]:
if "object_refs" in result_entity:
result_entity["object_refs"] = [
k for k in result_entity["object_refs"] if k in uuids
]
final_result.append(result_entity)
else:
final_result.append(result_entity)
return final_result
else:
return []
[docs]
def get_stix_bundle_or_object_from_entity_id(
self,
entity_type: str,
entity_id: str,
mode: str = "simple",
access_filter: Dict = None,
no_custom_attributes: bool = False,
only_entity: bool = False,
) -> Dict:
"""Get a STIX2 bundle or single object from an entity ID.
:param entity_type: Type of the entity to export
:type entity_type: str
:param entity_id: ID of the entity to export
:type entity_id: str
:param mode: Export mode - 'simple' or 'full', defaults to 'simple'
:type mode: str
:param access_filter: Access filter for the export, defaults to None
:type access_filter: Dict, optional
:param no_custom_attributes: Whether to exclude custom attributes, defaults to False
:type no_custom_attributes: bool, optional
:param only_entity: If True, return only the entity object instead of a bundle
:type only_entity: bool, optional
:return: STIX2 bundle dictionary or single STIX2 object if only_entity is True
:rtype: Dict
"""
bundle = {
"type": "bundle",
"id": "bundle--" + str(uuid.uuid4()),
"objects": [],
}
do_read = self.get_reader(entity_type)
entity = do_read(id=entity_id, withFiles=(mode == "full"))
if entity is None:
self.opencti.app_logger.error(
"Cannot export entity (not found)", {"id": entity_id}
)
return bundle
entity_standard_id = entity["standard_id"]
stix_objects = self.prepare_export(
entity=self.generate_export(entity, no_custom_attributes),
mode=mode,
access_filter=access_filter,
no_custom_attributes=no_custom_attributes,
)
if stix_objects is not None:
bundle["objects"].extend(stix_objects)
self._rewrite_embedded_image_uris_in_bundle_for_export(bundle)
if only_entity:
return [e for e in bundle["objects"] if e.get("id") == entity_standard_id][
0
]
return bundle
# Please use get_stix_bundle_or_object_from_entity_id instead
@deprecated("Use get_stix_bundle_or_object_from_entity_id instead")
[docs]
def export_entity(
self,
entity_type: str,
entity_id: str,
mode: str = "simple",
access_filter: Dict = None,
no_custom_attributes: bool = False,
only_entity: bool = False,
) -> Dict:
"""Export an entity as a STIX2 bundle.
.. deprecated::
Use :meth:`get_stix_bundle_or_object_from_entity_id` instead.
:param entity_type: Type of the entity to export
:type entity_type: str
:param entity_id: ID of the entity to export
:type entity_id: str
:param mode: Export mode - 'simple' or 'full', defaults to 'simple'
:type mode: str
:param access_filter: Access filter for the export, defaults to None
:type access_filter: Dict, optional
:param no_custom_attributes: Whether to exclude custom attributes, defaults to False
:type no_custom_attributes: bool, optional
:param only_entity: If True, return only the entity object instead of a bundle
:type only_entity: bool, optional
:return: STIX2 bundle dictionary or single STIX2 object
:rtype: Dict
"""
return self.get_stix_bundle_or_object_from_entity_id(
entity_type=entity_type,
entity_id=entity_id,
mode=mode,
access_filter=access_filter,
no_custom_attributes=no_custom_attributes,
only_entity=only_entity,
)
[docs]
def export_entities_list(
self,
entity_type: str,
search: Dict = None,
filters: Dict = None,
orderBy: str = None,
orderMode: str = None,
getAll: bool = True,
withFiles: bool = False,
) -> List[Dict]:
"""List entities for export based on type and filters.
:param entity_type: Type of entities to list
:type entity_type: str
:param search: Search parameters, defaults to None
:type search: Dict, optional
:param filters: Filter parameters, defaults to None
:type filters: Dict, optional
:param orderBy: Field to order results by, defaults to None
:type orderBy: str, optional
:param orderMode: Order direction ('asc' or 'desc'), defaults to None
:type orderMode: str, optional
:param getAll: Whether to get all results, defaults to True
:type getAll: bool, optional
:param withFiles: Whether to include files in the export, defaults to False
:type withFiles: bool, optional
:return: List of entity dictionaries
:rtype: List[Dict]
"""
if IdentityTypes.has_value(entity_type):
entity_type = "Identity"
if LocationTypes.has_value(entity_type):
entity_type = "Location"
if StixCyberObservableTypes.has_value(entity_type):
entity_type = "Stix-Cyber-Observable"
if entity_type == "Container":
entity_type = "Stix-Domain-Object"
# List
lister = {
"Stix-Core-Object": self.opencti.stix_core_object.list,
"Stix-Domain-Object": self.opencti.stix_domain_object.list,
"Attack-Pattern": self.opencti.attack_pattern.list,
"Campaign": self.opencti.campaign.list,
"Channel": self.opencti.channel.list,
"Event": self.opencti.event.list,
"Note": self.opencti.note.list,
"Observed-Data": self.opencti.observed_data.list,
"Opinion": self.opencti.opinion.list,
"Report": self.opencti.report.list,
"Grouping": self.opencti.grouping.list,
"Case-Incident": self.opencti.case_incident.list,
"Feedback": self.opencti.feedback.list,
"Case-Rfi": self.opencti.case_rfi.list,
"Case-Rft": self.opencti.case_rft.list,
"Task": self.opencti.task.list,
"Course-Of-Action": self.opencti.course_of_action.list,
"Data-Component": self.opencti.data_component.list,
"Data-Source": self.opencti.data_source.list,
"Identity": self.opencti.identity.list,
"Indicator": self.opencti.indicator.list,
"Infrastructure": self.opencti.infrastructure.list,
"Intrusion-Set": self.opencti.intrusion_set.list,
"Location": self.opencti.location.list,
"Language": self.opencti.language.list,
"Malware": self.opencti.malware.list,
"Malware-Analysis": self.opencti.malware_analysis.list,
"Threat-Actor": self.opencti.threat_actor_group.list,
"Threat-Actor-Group": self.opencti.threat_actor_group.list,
"Threat-Actor-Individual": self.opencti.threat_actor_individual.list,
"Tool": self.opencti.tool.list,
"Narrative": self.opencti.narrative.list,
"Vulnerability": self.opencti.vulnerability.list,
"Incident": self.opencti.incident.list,
"Stix-Cyber-Observable": self.opencti.stix_cyber_observable.list,
"stix-sighting-relationship": self.opencti.stix_sighting_relationship.list,
"stix-core-relationship": self.opencti.stix_core_relationship.list,
}
do_list = lister.get(
entity_type, lambda **kwargs: self.unknown_type({"type": entity_type})
)
if getAll and (orderBy is None or orderBy == "_score"):
orderBy = "created_at"
if orderMode is None:
orderMode = "desc"
# noinspection PyTypeChecker
return do_list(
search=search,
filters=filters,
orderBy=orderBy,
orderMode=orderMode,
getAll=getAll,
withFiles=withFiles,
)
[docs]
def export_list(
self,
entity_type: str,
search: Dict = None,
filters: Dict = None,
order_by: str = None,
order_mode: str = None,
mode: str = "simple",
access_filter: Dict = None,
) -> Dict:
"""Export a list of entities as a STIX2 bundle.
:param entity_type: Type of entities to export
:type entity_type: str
:param search: Search parameters, defaults to None
:type search: Dict, optional
:param filters: Filter parameters, defaults to None
:type filters: Dict, optional
:param order_by: Field to order results by, defaults to None
:type order_by: str, optional
:param order_mode: Order direction ('asc' or 'desc'), defaults to None
:type order_mode: str, optional
:param mode: Export mode - 'simple' or 'full', defaults to 'simple'
:type mode: str
:param access_filter: Access filter for the export, defaults to None
:type access_filter: Dict, optional
:return: STIX2 bundle containing all exported entities
:rtype: Dict
"""
bundle = {
"type": "bundle",
"id": "bundle--" + str(uuid.uuid4()),
"objects": [],
}
filter_groups = []
if filters is not None:
filter_groups.append(filters)
if access_filter is not None:
filter_groups.append(access_filter)
export_query_filter = {
"mode": "and",
"filterGroups": filter_groups,
"filters": [],
}
entities_list = self.export_entities_list(
entity_type=entity_type,
search=search,
filters=export_query_filter,
orderBy=order_by,
orderMode=order_mode,
getAll=True,
withFiles=(mode == "full"),
)
if entities_list is not None:
uuids = []
for entity in entities_list:
entity_bundle = self.prepare_export(
entity=self.generate_export(entity),
mode=mode,
access_filter=access_filter,
)
if entity_bundle is not None:
entity_bundle_filtered = self.filter_objects(uuids, entity_bundle)
for x in entity_bundle_filtered:
uuids.append(x["id"])
bundle["objects"] = bundle["objects"] + entity_bundle_filtered
self._rewrite_embedded_image_uris_in_bundle_for_export(bundle)
return bundle
[docs]
def export_selected(
self,
entities_list: List[dict],
mode: str = "simple",
access_filter: Dict = None,
) -> Dict:
"""Export selected entities as a STIX2 bundle.
:param entities_list: List of entities to export
:type entities_list: List[dict]
:param mode: Export mode ('simple' or 'full'), defaults to 'simple'
:type mode: str
:param access_filter: Access filter for the export
:type access_filter: Dict
:return: STIX2 bundle containing exported entities
:rtype: Dict
"""
bundle = {
"type": "bundle",
"id": "bundle--" + str(uuid.uuid4()),
"objects": [],
}
uuids = []
for entity in entities_list:
entity_bundle = self.prepare_export(
entity=self.generate_export(entity),
mode=mode,
access_filter=access_filter,
)
if entity_bundle is not None:
entity_bundle_filtered = self.filter_objects(uuids, entity_bundle)
for x in entity_bundle_filtered:
uuids.append(x["id"])
bundle["objects"] = (
bundle["objects"] + entity_bundle_filtered
) # unsupported operand type(s) for +: 'dict' and 'list'
self._rewrite_embedded_image_uris_in_bundle_for_export(bundle)
return bundle
[docs]
def apply_patch_files(self, item):
"""Apply file patches to an item.
:param item: Item containing file patch operations
:type item: dict
"""
field_patch = self.opencti.get_attribute_in_extension(
"opencti_field_patch", item
)
if field_patch is None:
field_patch = item["opencti_field_patch"]
field_patch_files = next(
(op for op in field_patch if op["key"] == "x_opencti_files"), None
)
item_id = self.opencti.get_attribute_in_extension("id", item)
if item_id is None:
item_id = item["id"]
do_add_file = self.opencti.stix_domain_object.add_file
if StixCyberObservableTypes.has_value(item["type"]):
do_add_file = self.opencti.stix_cyber_observable.add_file
elif item["type"] == "external-reference":
do_add_file = self.opencti.external_reference.add_file
if field_patch_files is not None:
for file in field_patch_files["value"]:
if "data" in file:
do_add_file(
id=item_id,
file_name=file["name"],
version=file.get("version", None),
data=base64.b64decode(file["data"]),
fileMarkings=file.get("object_marking_refs", None),
mime_type=file.get("mime_type", None),
no_trigger_import=file.get("no_trigger_import", False),
embedded=file.get("embedded", False),
)
[docs]
def apply_patch(self, item):
"""Apply field patches to an item.
:param item: Item containing field patch operations
:type item: dict
"""
field_patch = self.opencti.get_attribute_in_extension(
"opencti_field_patch", item
)
if field_patch is None:
field_patch = item["opencti_field_patch"]
field_patch_without_files = [
op for op in field_patch if op["key"] != "x_opencti_files"
]
item_id = self.opencti.get_attribute_in_extension("id", item)
if item_id is None:
item_id = item["id"]
if len(field_patch_without_files) > 0:
if item["type"] == "relationship":
self.opencti.stix_core_relationship.update_field(
id=item_id, input=field_patch_without_files
)
elif item["type"] == "sighting":
self.opencti.stix_sighting_relationship.update_field(
id=item_id, input=field_patch_without_files
)
elif StixCyberObservableTypes.has_value(item["type"]):
self.opencti.stix_cyber_observable.update_field(
id=item_id, input=field_patch_without_files
)
elif item["type"] == "external-reference":
self.opencti.external_reference.update_field(
id=item_id, input=field_patch_without_files
)
elif item["type"] == "indicator":
self.opencti.indicator.update_field(
id=item_id, input=field_patch_without_files
)
elif item["type"] == "notification":
self.opencti.notification.update_field(
id=item_id, input=field_patch_without_files
)
elif item["type"] == "user":
self.opencti.user.update_field(
id=item_id, input=field_patch_without_files
)
else:
self.opencti.stix_domain_object.update_field(
id=item_id, input=field_patch_without_files
)
self.apply_patch_files(item)
[docs]
def rule_apply(self, item, bundle_id):
"""Apply a rule to an item.
:param item: Item to apply the rule to
:type item: dict
"""
rule_id = self.opencti.get_attribute_in_extension("opencti_rule", item)
if rule_id is None:
rule_id = item["opencti_rule"]
self.opencti.stix_core_object.rule_apply_async(
element_id=item["id"], rule_id=rule_id, execution_id=bundle_id
)
[docs]
def rule_clear(self, item):
"""Clear a rule from an item.
:param item: Item to clear the rule from
:type item: dict
"""
rule_id = self.opencti.get_attribute_in_extension("opencti_rule", item)
if rule_id is None:
rule_id = item["opencti_rule"]
self.opencti.stix_core_object.rule_clear(element_id=item["id"], rule_id=rule_id)
[docs]
def rules_rescan(self, item, bundle_id):
"""Rescan rules for an item.
:param item: Item to rescan rules for
:type item: dict
"""
self.opencti.stix_core_object.rules_rescan_async(
element_id=item["id"], execution_id=bundle_id
)
[docs]
def organization_share(self, item):
"""Share an item with organizations.
:param item: Item to share
:type item: dict
"""
organization_ids = self.opencti.get_attribute_in_extension(
"sharing_organization_ids", item
)
if organization_ids is None:
organization_ids = item["sharing_organization_ids"]
sharing_direct_container = self.opencti.get_attribute_in_extension(
"sharing_direct_container", item
)
if sharing_direct_container is None:
sharing_direct_container = item["sharing_direct_container"]
if item["type"] == "relationship":
self.opencti.stix_core_relationship.organization_share(
item["id"], organization_ids, sharing_direct_container
)
elif item["type"] == "sighting":
self.opencti.stix_sighting_relationship.organization_share(
item["id"], organization_ids, sharing_direct_container
)
else:
# Element is considered stix core object
self.opencti.stix_core_object.organization_share(
item["id"], organization_ids, sharing_direct_container
)
[docs]
def organization_unshare(self, item):
"""Unshare an item from organizations.
:param item: Item to unshare
:type item: dict
"""
organization_ids = self.opencti.get_attribute_in_extension(
"sharing_organization_ids", item
)
if organization_ids is None:
organization_ids = item["sharing_organization_ids"]
sharing_direct_container = self.opencti.get_attribute_in_extension(
"sharing_direct_container", item
)
if sharing_direct_container is None:
sharing_direct_container = item["sharing_direct_container"]
if item["type"] == "relationship":
self.opencti.stix_core_relationship.organization_unshare(
item["id"], organization_ids, sharing_direct_container
)
elif item["type"] == "sighting":
self.opencti.stix_sighting_relationship.organization_unshare(
item["id"], organization_ids, sharing_direct_container
)
else:
# Element is considered stix core object
self.opencti.stix_core_object.organization_unshare(
item["id"], organization_ids, sharing_direct_container
)
[docs]
def element_add_organizations(self, item):
"""Add organizations to an element.
:param item: Item to add organizations to
:type item: dict
:raises ValueError: If the operation is not compatible with the item type
"""
organization_ids = self.opencti.get_attribute_in_extension(
"organization_ids", item
)
if organization_ids is None:
organization_ids = item["organization_ids"]
if item["type"] == "user":
for organization_id in organization_ids:
self.opencti.user.add_organization(
id=item["id"], organization_id=organization_id
)
else:
raise ValueError(
"Add organizations operation not compatible with type",
{"type": item["type"]},
)
[docs]
def element_remove_organizations(self, item):
"""Remove organizations from an element.
:param item: Item to remove organizations from
:type item: dict
:raises ValueError: If the operation is not compatible with the item type
"""
organization_ids = self.opencti.get_attribute_in_extension(
"organization_ids", item
)
if organization_ids is None:
organization_ids = item["organization_ids"]
if item["type"] == "user":
for organization_id in organization_ids:
self.opencti.user.delete_organization(
id=item["id"], organization_id=organization_id
)
else:
raise ValueError(
"Remove organizations operation not compatible with type",
{"type": item["type"]},
)
[docs]
def element_add_groups(self, item):
"""Add groups to an element.
:param item: Item to add groups to
:type item: dict
:raises ValueError: If the operation is not compatible with the item type
"""
group_ids = self.opencti.get_attribute_in_extension("group_ids", item)
if group_ids is None:
group_ids = item["group_ids"]
if item["type"] == "user":
for group_id in group_ids:
self.opencti.user.add_membership(id=item["id"], group_id=group_id)
else:
raise ValueError(
"Add groups operation not compatible with type", {"type": item["type"]}
)
[docs]
def element_remove_groups(self, item):
"""Remove groups from an element.
:param item: Item to remove groups from
:type item: dict
:raises ValueError: If the operation is not compatible with the item type
"""
group_ids = self.opencti.get_attribute_in_extension("group_ids", item)
if group_ids is None:
group_ids = item["group_ids"]
if item["type"] == "user":
for group_id in group_ids:
self.opencti.user.delete_membership(id=item["id"], group_id=group_id)
else:
raise ValueError(
"Remove groups operation not compatible with type",
{"type": item["type"]},
)
[docs]
def send_email(self, item):
"""Send an email for an item.
:param item: Item to send email for
:type item: dict
:raises ValueError: If the operation is not supported for the item type
"""
template_id = self.opencti.get_attribute_in_extension("template_id", item)
if template_id is None:
template_id = item["template_id"]
if item["type"] == "user":
self.opencti.user.send_mail(id=item["id"], template_id=template_id[0])
else:
raise ValueError(
"Not supported opencti_operation for this type",
{"type": item["type"]},
)
[docs]
def enroll_playbook(self, item):
playbook_id = self.opencti.get_attribute_in_extension("playbook_id", item)
if playbook_id is None:
playbook_id = item.get("playbook_id")
entity_id = self.opencti.get_attribute_in_extension("id", item)
if entity_id is None:
entity_id = item.get("id")
if playbook_id and entity_id:
self.opencti.playbook.playbook_execute(
playbook_id=playbook_id, entity_id=entity_id
)
[docs]
def element_operation_delete(self, item, operation):
"""Delete an element.
:param item: Item to delete
:type item: dict
:param operation: Delete operation type ('delete' or 'delete_force')
:type operation: str
:raises ValueError: If the delete operation fails or helper not found
"""
# If data is stix, just use the generic stix function for deletion
force_delete = operation == "delete_force"
if item["type"] == "relationship":
self.opencti.stix_core_relationship.delete(id=item["id"])
elif item["type"] == "external-reference":
self.opencti.external_reference.delete(item["id"])
elif item["type"] == "sighting":
self.opencti.stix_sighting_relationship.delete(id=item["id"])
elif item["type"] in STIX_META_OBJECTS:
self.opencti.stix.delete(id=item["id"], force_delete=force_delete)
elif item["type"] in list(STIX_CYBER_OBSERVABLE_MAPPING.keys()):
self.opencti.stix_cyber_observable.delete(id=item["id"])
elif item["type"] in STIX_CORE_OBJECTS:
self.opencti.stix_core_object.delete(id=item["id"])
else:
# Element is not knowledge we need to use the right api
stix_helper = self.get_internal_helper().get(item["type"])
if stix_helper and hasattr(stix_helper, "delete"):
stix_helper.delete(id=item["id"], item=item)
else:
raise ValueError(
"Delete operation or not found stix helper", {"type": item["type"]}
)
[docs]
def element_remove_from_draft(self, item):
"""Remove an element from draft.
:param item: Item to remove from draft
:type item: dict
"""
if item["type"] == "relationship":
self.opencti.stix_core_relationship.remove_from_draft(id=item["id"])
elif item["type"] == "sighting":
self.opencti.stix_sighting_relationship.remove_from_draft(id=item["id"])
else:
# Element is considered stix core object
self.opencti.stix_core_object.remove_from_draft(id=item["id"])
[docs]
def apply_opencti_operation(self, item, operation, bundle_id):
"""Apply an OpenCTI operation to an item.
:param item: Item to apply the operation to
:type item: dict
:param operation: Operation to apply (delete, restore, merge, patch, etc.)
:type operation: str
:raises ValueError: If the operation is not supported
"""
if operation == "delete" or operation == "delete_force":
self.element_operation_delete(item=item, operation=operation)
elif operation == "revert_draft":
self.element_remove_from_draft(item=item)
elif operation == "restore":
self.opencti.trash.restore(item["id"])
elif operation == "enroll_playbook":
self.enroll_playbook(item=item)
elif operation == "merge":
target_id = self.opencti.get_attribute_in_extension("merge_target_id", item)
if target_id is None:
target_id = item["merge_target_id"]
source_ids = self.opencti.get_attribute_in_extension(
"merge_source_ids", item
)
if source_ids is None:
source_ids = item["merge_source_ids"]
self.opencti.stix.merge(id=target_id, object_ids=source_ids)
elif operation == "patch":
self.apply_patch(item=item)
elif operation == "pir_flag_element":
element_id = item["id"]
pir_input = item["input"]
self.opencti.pir.pir_flag_element(id=element_id, input=pir_input)
elif operation == "pir_unflag_element":
element_id = item["id"]
pir_input = item["input"]
self.opencti.pir.pir_unflag_element(id=element_id, input=pir_input)
elif operation == "rule_apply":
self.rule_apply(item=item, bundle_id=bundle_id)
elif operation == "rule_clear":
self.rule_clear(item=item)
elif operation == "rules_rescan":
self.rules_rescan(item=item, bundle_id=bundle_id)
elif operation == "share":
self.organization_share(item=item)
elif operation == "unshare":
self.organization_unshare(item=item)
elif operation == "clear_access_restriction":
self.opencti.stix_core_object.clear_access_restriction(
element_id=item["id"]
)
elif operation == "enrichment":
connector_ids = self.opencti.get_attribute_in_extension(
"connector_ids", item
)
if connector_ids is None:
connector_ids = item["connector_ids"]
self.opencti.stix_core_object.ask_enrichments(
element_id=item["id"], connector_ids=connector_ids
)
elif operation == "add_organizations":
self.element_add_organizations(item)
elif operation == "remove_organizations":
self.element_remove_organizations(item)
elif operation == "add_groups":
self.element_add_groups(item)
elif operation == "remove_groups":
self.element_remove_groups(item)
elif operation == "send_email":
self.send_email(item=item)
else:
raise ValueError(
"Not supported opencti_operation",
{"operation": operation},
)
[docs]
def import_item(
self,
item,
update: bool = False,
types: List = None,
work_id: str = None,
bundle_id: str = None,
):
"""Import a single STIX2 item into OpenCTI.
:param item: STIX2 item to import
:type item: dict
:param update: Whether to update existing data, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: List, optional
:param work_id: Work ID for tracking import progress, defaults to None
:type work_id: str, optional
:return: True on success
:rtype: bool
"""
is_inferred = self.opencti.get_attribute_in_extension("is_inferred", item)
opencti_operation = self.opencti.get_attribute_in_extension(
"opencti_operation", item
)
if is_inferred:
self.opencti.app_logger.debug(
"Ignoring inferred item during import",
{"id": item["id"], "type": item["type"]},
)
elif opencti_operation is not None:
self.apply_opencti_operation(item, opencti_operation, bundle_id)
elif "opencti_operation" in item:
self.apply_opencti_operation(item, item["opencti_operation"], bundle_id)
elif item["type"] == "relationship":
# Import relationship
self.import_relationship(item, update, types)
elif item["type"] == "sighting":
# region Resolve the to
to_ids = []
if "x_opencti_where_sighted_refs" in item:
for where_sighted_ref in item["x_opencti_where_sighted_refs"]:
to_ids.append(where_sighted_ref)
elif "where_sighted_refs" in item:
for where_sighted_ref in item["where_sighted_refs"]:
to_ids.append(where_sighted_ref)
# endregion
# region Resolve the from
from_id = None
if "x_opencti_sighting_of_ref" in item:
from_id = item["x_opencti_sighting_of_ref"]
elif "sighting_of_ref" in item:
from_id = item["sighting_of_ref"]
# endregion
# region create the sightings
if len(to_ids) > 0:
if from_id:
for to_id in to_ids:
self.import_sighting(item, from_id, to_id, update)
# Import observed_data_refs
if "observed_data_refs" in item:
for observed_data_ref in item["observed_data_refs"]:
for to_id in to_ids:
self.import_sighting(item, observed_data_ref, to_id, update)
# endregion
elif item["type"] == "label":
stix_ids = self.opencti.get_attribute_in_extension("stix_ids", item)
self.opencti.label.create(
stix_id=item["id"],
value=item["value"],
color=item["color"],
x_opencti_stix_ids=stix_ids,
update=update,
)
elif item["type"] == "vocabulary":
stix_ids = self.opencti.get_attribute_in_extension("stix_ids", item)
self.opencti.vocabulary.create(
stix_id=item["id"],
name=item["name"],
category=item["category"],
description=(item["description"] if "description" in item else None),
aliases=item["aliases"] if "aliases" in item else None,
x_opencti_stix_ids=stix_ids,
update=update,
)
elif item["type"] == "external-reference":
stix_ids = self.opencti.get_attribute_in_extension("stix_ids", item)
self.opencti.external_reference.create(
stix_id=item["id"],
source_name=(item["source_name"] if "source_name" in item else None),
url=item["url"] if "url" in item else None,
external_id=(item["external_id"] if "external_id" in item else None),
description=(item["description"] if "description" in item else None),
x_opencti_stix_ids=stix_ids,
update=update,
)
elif item["type"] == "kill-chain-phase":
stix_ids = self.opencti.get_attribute_in_extension("stix_ids", item)
self.opencti.kill_chain_phase.create(
stix_id=item["id"],
kill_chain_name=item["kill_chain_name"],
phase_name=item["phase_name"],
x_opencti_order=item["order"] if "order" in item else 0,
x_opencti_stix_ids=stix_ids,
update=update,
)
elif StixCyberObservableTypes.has_value(item["type"]):
if types is None or len(types) == 0:
self.import_observable(item, update, types)
elif item["type"] in types or "observable" in types:
self.import_observable(item, update, types)
else:
# Check the scope
if item["type"] == "marking-definition" or types is None or len(types) == 0:
self.import_object(item, update, types)
# Handle identity & location if part of the scope
elif item["type"] in types:
self.import_object(item, update, types)
else:
# Specific OpenCTI scopes
if item["type"] == "identity":
if "identity_class" in item:
if ("class" in types or "sector" in types) and item[
"identity_class"
] == "class":
self.import_object(item, update, types)
elif item["identity_class"] in types:
self.import_object(item, update, types)
elif item["type"] == "location":
if "x_opencti_location_type" in item:
if item["x_opencti_location_type"].lower() in types:
self.import_object(item, update, types)
elif (
self.opencti.get_attribute_in_extension("location_type", item)
is not None
):
if (
self.opencti.get_attribute_in_extension(
"location_type", item
).lower()
in types
):
self.import_object(item, update, types)
if work_id is not None:
self.opencti.work.report_expectation(work_id, None)
bundles_success_counter.add(1)
return True
[docs]
def import_item_with_retries(
self,
item,
update: bool = False,
types: List = None,
work_id: str = None,
bundle_id: str = None,
):
"""Import a single STIX2 item with automatic retry on failures.
Handles various error types including timeouts, lock errors, missing references,
and bad gateway errors with appropriate retry strategies.
:param item: STIX2 item to import
:type item: dict
:param update: Whether to update existing data, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: List, optional
:param work_id: Work ID for tracking import progress, defaults to None
:type work_id: str, optional
:return: None on success, the failed item on permanent failure
:rtype: dict or None
"""
processing_count = 0
worker_logger = self.opencti.logger_class("worker")
error_msg = ""
while processing_count <= MAX_PROCESSING_COUNT:
try:
self.opencti.set_retry_number(processing_count)
self.import_item(item, update, types, work_id, bundle_id)
return None
except (RequestException, Timeout):
bundles_timeout_error_counter.add(1)
error_msg = "A connection error or timeout occurred"
worker_logger.warning(error_msg)
# Platform is under heavy load: wait for unlock & retry almost indefinitely.
sleep_jitter = round(random.uniform(10, 30), 2)
time.sleep(sleep_jitter)
processing_count += 1
except Exception as ex: # pylint: disable=broad-except
error = str(ex)
error_msg = traceback.format_exc()
in_retry = processing_count < PROCESSING_COUNT
# Platform is under heavy load, wait for unlock & retry indefinitely.
if ERROR_TYPE_LOCK in error_msg:
bundles_lock_error_counter.add(1)
sleep_jitter = round(random.uniform(1, 3), 2)
time.sleep(sleep_jitter)
processing_count += 1
# Platform detects a missing reference and have to retry
elif ERROR_TYPE_MISSING_REFERENCE in error_msg and in_retry:
bundles_missing_reference_error_counter.add(1)
sleep_jitter = round(random.uniform(1, 3), 2)
time.sleep(sleep_jitter)
processing_count += 1
# A bad gateway error occurs
elif ERROR_TYPE_BAD_GATEWAY in error_msg:
worker_logger.error(
"Message reprocess for bad gateway",
{"count": processing_count},
)
bundles_bad_gateway_error_counter.add(1)
time.sleep(60)
processing_count += 1
# Request timeout error occurs
elif ERROR_TYPE_TIMEOUT in error_msg:
worker_logger.error(
"Message reprocess for request timed out",
{"count": processing_count},
)
bundles_timed_out_error_counter.add(1)
time.sleep(60)
processing_count += 1
# A draft lock error occurs
elif ERROR_TYPE_DRAFT_LOCK in error_msg:
bundles_technical_error_counter.add(1)
if work_id is not None:
self.opencti.work.api.set_draft_id("")
self.opencti.work.report_expectation(
work_id,
{
"error": error,
"source": "Draft in read only",
},
)
return None
# A work not alive error occurs
elif ERROR_TYPE_WORK_NOT_ALIVE in error_msg:
worker_logger.info(
"Message skipped because work is no longer alive",
)
return None
# Platform does not know what to do and raises an error:
# That also works for missing reference with too much execution
else:
bundles_technical_error_counter.add(1)
worker_logger.error(
"Unrecognized error during bundle import", {"error": error}
)
if work_id is not None:
item_str = json.dumps(item)
self.opencti.work.report_expectation(
work_id,
{
"error": error,
"source": (
item_str
if len(item_str) < 50000
else "Bundle too large"
),
},
)
return None
max_retry_error_message = "Max number of retries reached, please see error logs of workers for more details. Bundle will be sent to dead letter queue."
worker_logger.error(max_retry_error_message)
if work_id is not None:
item_str = json.dumps(item)
self.opencti.work.report_expectation(
work_id,
{
"error": max_retry_error_message,
"source": (
item_str if len(item_str) < 50000 else "Bundle too large"
),
},
)
item["rejection_info"] = {
"reject_reason": "MAX_RETRY",
"last_error_msg": error_msg,
}
return item
[docs]
def import_bundle(
self,
stix_bundle: Dict,
update: bool = False,
types: List = None,
work_id: str = None,
objects_max_refs: int = 0,
) -> Tuple[list, list]:
"""Import a complete STIX2 bundle into OpenCTI.
:param stix_bundle: STIX2 bundle dictionary to import
:type stix_bundle: Dict
:param update: Whether to update existing data, defaults to False
:type update: bool, optional
:param types: List of STIX2 types to filter, defaults to None
:type types: List, optional
:param work_id: Work ID for tracking import progress, defaults to None
:type work_id: str, optional
:param objects_max_refs: Maximum number of object references allowed; objects exceeding
this limit will be rejected. Set to 0 to disable the limit.
:type objects_max_refs: int, optional
:return: Tuple of (list of successfully imported elements, list of failed/too-large elements)
:rtype: Tuple[list, list]
:raises ValueError: If the bundle is not properly formatted or empty
"""
worker_logger = self.opencti.logger_class("worker")
# Check if the bundle is correctly formatted
if "type" not in stix_bundle or stix_bundle["type"] != "bundle":
raise ValueError("JSON data type is not a STIX2 bundle")
if "objects" not in stix_bundle or len(stix_bundle["objects"]) == 0:
raise ValueError("JSON data objects is empty")
event_version = (
stix_bundle["x_opencti_event_version"]
if "x_opencti_event_version" in stix_bundle
else None
)
stix2_splitter = OpenCTIStix2Splitter()
_, incompatible_elements, bundles = (
stix2_splitter.split_bundle_with_expectations(
stix_bundle, False, event_version
)
)
# Report every element ignored during bundle splitting
if work_id is not None:
for incompatible_element in incompatible_elements:
self.opencti.work.report_expectation(
work_id,
{
"error": "Incompatible element in bundle",
"source": "Element "
+ incompatible_element["id"]
+ " is incompatible and couldn't be processed",
},
)
# Import every element in a specific order
imported_elements = []
too_large_elements_bundles = []
for bundle in bundles:
bundle_id = bundle["id"]
for item in bundle["objects"]:
# If item is considered too large, meaning that it has a number of refs higher than inputted objects_max_refs, do not import it
nb_refs = OpenCTIStix2Utils.compute_object_refs_number(item)
if 0 < objects_max_refs <= nb_refs:
too_large_element_message = "Too large element in bundle"
worker_logger.warning(too_large_element_message)
item["rejection_info"] = {
"reject_reason": "ELEMENT_TOO_LARGE",
"objects_max_refs": objects_max_refs,
}
self.opencti.work.report_expectation(
work_id,
{
"error": too_large_element_message,
"source": "Element "
+ item["id"]
+ " is too large and couldn't be processed",
},
)
too_large_elements_bundles.append(item)
else:
failed_item = self.import_item_with_retries(
item, update, types, work_id, bundle_id
)
if failed_item is not None:
too_large_elements_bundles.append(failed_item)
else:
imported_elements.append(
{"id": item["id"], "type": item["type"]}
)
return imported_elements, too_large_elements_bundles
@staticmethod
[docs]
def put_attribute_in_extension(
stix_object, extension_id, key, value, multiple=False
) -> any:
"""Add or update an attribute in a STIX object's extension.
:param stix_object: STIX object to modify
:type stix_object: dict
:param extension_id: ID of the extension to add the attribute to
:type extension_id: str
:param key: Attribute key name
:type key: str
:param value: Attribute value to set
:type value: any
:param multiple: If True, append value to a list; if False, replace the value
:type multiple: bool
:return: Modified STIX object
:rtype: dict
"""
if ("x_opencti_" + key) in stix_object:
del stix_object["x_opencti_" + key]
if ("x_mitre_" + key) in stix_object:
del stix_object["x_mitre_" + key]
if "extensions" not in stix_object:
stix_object["extensions"] = {}
if extension_id not in stix_object["extensions"]:
stix_object["extensions"][extension_id] = {}
if key in stix_object["extensions"][extension_id]:
if multiple:
stix_object["extensions"][extension_id][key].append(value)
else:
stix_object["extensions"][extension_id][key] = value
else:
if multiple:
stix_object["extensions"][extension_id][key] = [value]
else:
stix_object["extensions"][extension_id][key] = value
return stix_object