Source code for pycti.api.opencti_api_client

# coding: utf-8
import atexit
import base64
import datetime
import io
import json
import os
import re
import shutil
import signal
import tempfile
import threading
from typing import Any, Dict, Optional, Tuple, Union

import magic
import requests

from pycti import __version__
from pycti.api.opencti_api_connector import OpenCTIApiConnector
from pycti.api.opencti_api_draft import OpenCTIApiDraft
from pycti.api.opencti_api_internal_file import OpenCTIApiInternalFile
from pycti.api.opencti_api_notification import OpenCTIApiNotification
from pycti.api.opencti_api_pir import OpenCTIApiPir
from pycti.api.opencti_api_playbook import OpenCTIApiPlaybook
from pycti.api.opencti_api_public_dashboard import OpenCTIApiPublicDashboard
from pycti.api.opencti_api_trash import OpenCTIApiTrash
from pycti.api.opencti_api_work import OpenCTIApiWork
from pycti.api.opencti_api_workspace import OpenCTIApiWorkspace
from pycti.entities.opencti_attack_pattern import AttackPattern
from pycti.entities.opencti_campaign import Campaign
from pycti.entities.opencti_capability import Capability
from pycti.entities.opencti_case_incident import CaseIncident
from pycti.entities.opencti_case_rfi import CaseRfi
from pycti.entities.opencti_case_rft import CaseRft
from pycti.entities.opencti_channel import Channel
from pycti.entities.opencti_course_of_action import CourseOfAction
from pycti.entities.opencti_data_component import DataComponent
from pycti.entities.opencti_data_source import DataSource
from pycti.entities.opencti_event import Event
from pycti.entities.opencti_external_reference import ExternalReference
from pycti.entities.opencti_feedback import Feedback
from pycti.entities.opencti_group import Group
from pycti.entities.opencti_grouping import Grouping
from pycti.entities.opencti_identity import Identity
from pycti.entities.opencti_incident import Incident
from pycti.entities.opencti_indicator import Indicator
from pycti.entities.opencti_infrastructure import Infrastructure
from pycti.entities.opencti_intrusion_set import IntrusionSet
from pycti.entities.opencti_kill_chain_phase import KillChainPhase
from pycti.entities.opencti_label import Label
from pycti.entities.opencti_language import Language
from pycti.entities.opencti_location import Location
from pycti.entities.opencti_malware import Malware
from pycti.entities.opencti_malware_analysis import MalwareAnalysis
from pycti.entities.opencti_marking_definition import MarkingDefinition
from pycti.entities.opencti_narrative import Narrative
from pycti.entities.opencti_note import Note
from pycti.entities.opencti_observed_data import ObservedData
from pycti.entities.opencti_opinion import Opinion
from pycti.entities.opencti_report import Report
from pycti.entities.opencti_role import Role
from pycti.entities.opencti_security_coverage import SecurityCoverage
from pycti.entities.opencti_settings import Settings
from pycti.entities.opencti_stix import Stix
from pycti.entities.opencti_stix_core_object import StixCoreObject
from pycti.entities.opencti_stix_core_relationship import StixCoreRelationship
from pycti.entities.opencti_stix_cyber_observable import StixCyberObservable
from pycti.entities.opencti_stix_domain_object import StixDomainObject
from pycti.entities.opencti_stix_nested_ref_relationship import (
    StixNestedRefRelationship,
)
from pycti.entities.opencti_stix_object_or_stix_relationship import (
    StixObjectOrStixRelationship,
)
from pycti.entities.opencti_stix_sighting_relationship import StixSightingRelationship
from pycti.entities.opencti_task import Task
from pycti.entities.opencti_threat_actor import ThreatActor
from pycti.entities.opencti_threat_actor_group import ThreatActorGroup
from pycti.entities.opencti_threat_actor_individual import ThreatActorIndividual
from pycti.entities.opencti_tool import Tool
from pycti.entities.opencti_user import User
from pycti.entities.opencti_vocabulary import Vocabulary
from pycti.entities.opencti_vulnerability import Vulnerability
from pycti.utils.opencti_logger import logger
from pycti.utils.opencti_stix2 import OpenCTIStix2
from pycti.utils.opencti_stix2_utils import OpenCTIStix2Utils

# Global singleton variables for proxy certificate management
_PROXY_CERT_BUNDLE = None
_PROXY_CERT_DIR = None
_PROXY_CERT_LOCK = threading.Lock()
_PROXY_SIGNAL_HANDLERS_REGISTERED = False


[docs] def build_request_headers(token: str, custom_headers: str, app_logger, provider: str): """Build request headers for OpenCTI API requests. :param token: the API authentication token :type token: str :param custom_headers: custom headers in format "header01:value;header02:value" :type custom_headers: str :param app_logger: the application logger instance :type app_logger: logging.Logger :param provider: the provider string for User-Agent header :type provider: str :return: dictionary of request headers :rtype: dict """ pycti_user_agent = "pycti/" + __version__ if provider is not None: pycti_user_agent += " " + provider headers_dict = { "User-Agent": pycti_user_agent, "Authorization": "Bearer " + token, } # Build and add custom headers if custom_headers is not None: for header_pair in custom_headers.strip().split(";"): if header_pair: # Skip empty header pairs try: key, value = header_pair.split(":", 1) headers_dict[key.strip()] = value.strip() except ValueError: app_logger.warning( "Ignored invalid header pair", {"header_pair": header_pair} ) return headers_dict
[docs] class File: """File object for OpenCTI file uploads. Represents a file to be uploaded via the OpenCTI API. :param name: the filename :type name: str :param data: the file content (string or bytes) :type data: str or bytes :param mime: the MIME type of the file, defaults to "text/plain" :type mime: str, optional """
[docs] def __init__(self, name, data, mime="text/plain"): """Initialize the File instance. :param name: the filename :type name: str :param data: the file content :type data: str or bytes :param mime: the MIME type of the file (default: "text/plain") :type mime: str """
[docs] self.name = name
[docs] self.data = data
[docs] self.mime = mime
[docs] class OpenCTIApiClient: """Main API client for OpenCTI :param url: OpenCTI API url :type url: str :param token: OpenCTI API token :type token: str :param log_level: log level for the client :type log_level: str, optional :param ssl_verify: Requiring the requests to verify the TLS certificate at the server. :type ssl_verify: bool, str, optional :param proxies: proxy configuration with "http" and "https" keys (e.g., {"http": "http://my_proxy:8080", "https": "http://my_proxy:8080"}) :type proxies: dict, optional :param json_logging: format the logs as json if set to True :type json_logging: bool, optional :param bundle_send_to_queue: if bundle will be sent to queue :type bundle_send_to_queue: bool, optional :param cert: If String, file path to pem file. If Tuple, a ('path_to_cert.crt', 'path_to_key.key') pair representing the certificate and the key. :type cert: str, tuple, optional :param custom_headers: Add custom headers to use with the graphql queries :type custom_headers: str, optional must in the format header01:value;header02:value :param perform_health_check: if client init must check the api access :type perform_health_check: bool, optional :param requests_timeout: define the timeout for API requests in seconds :type requests_timeout: int, optional :param provider: define client provider, and is used to specify it in requests user agent header :type provider: string, optional """
[docs] def __init__( self, url: str, token: str, log_level: str = "info", ssl_verify: Union[bool, str] = False, proxies: Union[Dict[str, str], None] = None, json_logging: bool = False, bundle_send_to_queue: bool = True, cert: Union[str, Tuple[str, str], None] = None, custom_headers: Optional[str] = None, perform_health_check: bool = True, requests_timeout: int = 300, provider: Optional[str] = None, ): """Initialize the OpenCTIApiClient instance. :param url: OpenCTI platform URL :type url: str :param token: OpenCTI API authentication token :type token: str :param log_level: logging level (default: "info") :type log_level: str :param ssl_verify: SSL certificate verification setting :type ssl_verify: Union[bool, str] :param proxies: proxy configuration dictionary with "http" and "https" keys :type proxies: Dict[str, str] or None :param json_logging: whether to format logs as JSON (default: False) :type json_logging: bool :param bundle_send_to_queue: whether bundles are sent to queue (default: True) :type bundle_send_to_queue: bool :param cert: client certificate path or tuple of (cert, key) paths :type cert: str, tuple, or None :param custom_headers: custom headers in format "header01:value;header02:value" :type custom_headers: str or None :param perform_health_check: whether to check API access on init (default: True) :type perform_health_check: bool :param requests_timeout: timeout for API requests in seconds (default: 300) :type requests_timeout: int :param provider: client provider for User-Agent header (format: provider/version) :type provider: str or None :raises ValueError: If URL or token is missing or invalid """ # Check configuration
[docs] self.bundle_send_to_queue = bundle_send_to_queue
[docs] self.ssl_verify = ssl_verify
[docs] self.cert = cert
[docs] self.proxies = proxies
if url is None or len(url) == 0: raise ValueError("An URL must be set") if token is None or len(token) == 0 or token == "ChangeMe": raise ValueError("A TOKEN must be set") # Configure logger
[docs] self.logger_class = logger(log_level.upper(), json_logging)
[docs] self.app_logger = self.logger_class("api")
[docs] self.admin_logger = self.logger_class("admin")
# Setup proxy certificates if provided self._setup_proxy_certificates() # Define API
[docs] self.api_token = token
[docs] self.api_url = url.rstrip("/") + "/graphql"
if provider is not None: provider_pattern_checker = re.compile( r"^[A-Za-z]+\/\d+(?:\.[a-z]*\d+){0,}$" ) if not provider_pattern_checker.match(provider): raise ValueError( "Provider format is incorrect: format has to be {provider}/{provider_version}, e.g. client/4.5, company_name/1.4.6..." )
[docs] self.provider = provider
[docs] self.request_headers = build_request_headers( token, custom_headers, self.app_logger, provider )
[docs] self.session = requests.session()
[docs] self.session_requests_timeout = requests_timeout
# Define the dependencies
[docs] self.work = OpenCTIApiWork(self)
[docs] self.notification = OpenCTIApiNotification(self)
[docs] self.trash = OpenCTIApiTrash(self)
[docs] self.draft = OpenCTIApiDraft(self)
[docs] self.workspace = OpenCTIApiWorkspace(self)
[docs] self.public_dashboard = OpenCTIApiPublicDashboard(self)
[docs] self.playbook = OpenCTIApiPlaybook(self)
[docs] self.connector = OpenCTIApiConnector(self)
[docs] self.stix2 = OpenCTIStix2(self)
[docs] self.pir = OpenCTIApiPir(self)
[docs] self.internal_file = OpenCTIApiInternalFile(self)
[docs] self.file = File # File class for creating upload objects
# Define the entities
[docs] self.vocabulary = Vocabulary(self)
[docs] self.label = Label(self)
[docs] self.marking_definition = MarkingDefinition(self)
[docs] self.external_reference = ExternalReference(self)
[docs] self.kill_chain_phase = KillChainPhase(self)
[docs] self.opencti_stix_object_or_stix_relationship = StixObjectOrStixRelationship( self )
[docs] self.stix = Stix(self)
[docs] self.stix_domain_object = StixDomainObject(self)
[docs] self.stix_core_object = StixCoreObject(self)
[docs] self.stix_cyber_observable = StixCyberObservable(self)
[docs] self.stix_core_relationship = StixCoreRelationship(self)
[docs] self.stix_sighting_relationship = StixSightingRelationship(self)
[docs] self.stix_nested_ref_relationship = StixNestedRefRelationship(self)
[docs] self.identity = Identity(self)
[docs] self.event = Event(self)
[docs] self.location = Location(self)
[docs] self.threat_actor = ThreatActor(self)
[docs] self.threat_actor_group = ThreatActorGroup(self)
[docs] self.threat_actor_individual = ThreatActorIndividual(self)
[docs] self.intrusion_set = IntrusionSet(self)
[docs] self.infrastructure = Infrastructure(self)
[docs] self.campaign = Campaign(self)
[docs] self.case_incident = CaseIncident(self)
[docs] self.feedback = Feedback(self)
[docs] self.case_rfi = CaseRfi(self)
[docs] self.case_rft = CaseRft(self)
[docs] self.task = Task(self)
[docs] self.incident = Incident(self)
[docs] self.malware = Malware(self)
[docs] self.malware_analysis = MalwareAnalysis(self)
[docs] self.tool = Tool(self)
[docs] self.channel = Channel(self)
[docs] self.narrative = Narrative(self)
[docs] self.language = Language(self)
[docs] self.vulnerability = Vulnerability(self)
[docs] self.security_coverage = SecurityCoverage(self)
[docs] self.attack_pattern = AttackPattern(self)
[docs] self.course_of_action = CourseOfAction(self)
[docs] self.data_component = DataComponent(self)
[docs] self.data_source = DataSource(self)
[docs] self.report = Report(self)
[docs] self.note = Note(self)
[docs] self.observed_data = ObservedData(self)
[docs] self.opinion = Opinion(self)
[docs] self.grouping = Grouping(self)
[docs] self.indicator = Indicator(self)
# Admin functionality
[docs] self.capability = Capability(self)
[docs] self.role = Role(self)
[docs] self.group = Group(self)
[docs] self.user = User(self)
[docs] self.settings = Settings(self)
# Keep track of draft context
[docs] self.draft_id = ""
# Check if openCTI is available if perform_health_check and not self.health_check(): raise ValueError( "OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..." )
def _setup_proxy_certificates(self): """Setup HTTPS proxy certificates from environment variable. Detects HTTPS_CA_CERTIFICATES environment variable and combines proxy certificates with system certificates for SSL verification. Supports both inline certificate content and file paths. Uses a singleton pattern to ensure only one certificate bundle is created across all instances, avoiding resource leaks and conflicts. """ global _PROXY_CERT_BUNDLE, _PROXY_CERT_DIR, _PROXY_SIGNAL_HANDLERS_REGISTERED https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") if not https_ca_certificates: return # Thread-safe check and setup with _PROXY_CERT_LOCK: # If already configured, reuse existing bundle if _PROXY_CERT_BUNDLE is not None: self.ssl_verify = _PROXY_CERT_BUNDLE self.app_logger.debug( "Reusing existing proxy certificate bundle", {"cert_bundle": _PROXY_CERT_BUNDLE}, ) return # First initialization - create the certificate bundle try: # Create secure temporary directory cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path cert_content = self._get_certificate_content(https_ca_certificates) # Write proxy certificate to temp file proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") with open(proxy_cert_file, "w") as f: f.write(cert_content) # Find system certificates system_cert_paths = [ "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS "/etc/ssl/cert.pem", # Alpine/BSD ] # Create combined certificate bundle combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") with open(combined_cert_file, "w") as combined: # Add system certificates first for system_path in system_cert_paths: if os.path.exists(system_path): with open(system_path, "r") as sys_certs: combined.write(sys_certs.read()) combined.write("\n") break # Add proxy certificate combined.write(cert_content) # Update global singleton variables _PROXY_CERT_BUNDLE = combined_cert_file _PROXY_CERT_DIR = cert_dir self.ssl_verify = combined_cert_file # Set environment variables for urllib and other libraries os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file os.environ["SSL_CERT_FILE"] = combined_cert_file # Register cleanup handlers only once atexit.register(_cleanup_proxy_certificates) # Register signal handlers only once if not _PROXY_SIGNAL_HANDLERS_REGISTERED: signal.signal(signal.SIGTERM, _signal_handler_proxy_cleanup) signal.signal(signal.SIGINT, _signal_handler_proxy_cleanup) _PROXY_SIGNAL_HANDLERS_REGISTERED = True self.app_logger.info( "Proxy certificates configured", {"cert_bundle": combined_cert_file}, ) except Exception as e: self.app_logger.error( "Failed to setup proxy certificates", {"error": str(e)} ) raise def _get_certificate_content(self, https_ca_certificates): """Extract certificate content from environment variable. Supports both inline certificate content (PEM format) and file paths. :param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var :type https_ca_certificates: str :return: Certificate content in PEM format :rtype: str :raises ValueError: If the certificate content is invalid or cannot be read """ # Strip whitespace once at the beginning stripped_https_ca_certificates = https_ca_certificates.strip() # Check if it's inline certificate content (starts with PEM header) if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"): self.app_logger.debug( "HTTPS_CA_CERTIFICATES contains inline certificate content" ) return https_ca_certificates # Check if it's a file path if os.path.isfile(stripped_https_ca_certificates): cert_file_path = stripped_https_ca_certificates try: with open(cert_file_path, "r") as f: cert_content = f.read() # Validate it's actually a certificate if "-----BEGIN CERTIFICATE-----" in cert_content: self.app_logger.debug( "HTTPS_CA_CERTIFICATES contains valid certificate file path", {"file_path": cert_file_path}, ) return cert_content else: raise ValueError( f"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate: {cert_file_path}" ) except ValueError: # Re-raise ValueError from certificate validation raise except Exception as e: raise ValueError( f"Failed to read certificate file at {cert_file_path}: {str(e)}" ) # Neither inline content nor valid file path raise ValueError( f"HTTPS_CA_CERTIFICATES is not a valid certificate or file path: {https_ca_certificates[:50]}..." )
[docs] def set_applicant_id_header(self, applicant_id): """Set the applicant ID header for impersonation. :param applicant_id: the ID of the user to impersonate :type applicant_id: str """ self.request_headers["opencti-applicant-id"] = applicant_id
[docs] def set_playbook_id_header(self, playbook_id): """Set the playbook ID header for tracking playbook execution. :param playbook_id: the ID of the playbook being executed :type playbook_id: str """ self.request_headers["opencti-playbook-id"] = playbook_id
[docs] def set_event_id(self, event_id): """Set the event ID header for event tracking. :param event_id: the ID of the event :type event_id: str """ self.request_headers["opencti-event-id"] = event_id
[docs] def get_draft_id(self): """Get the current draft ID. :return: the current draft ID or empty string if not set :rtype: str """ if self.draft_id is None: return "" return self.draft_id
[docs] def set_draft_id(self, draft_id): """Set the draft ID header for draft mode operations. :param draft_id: the ID of the draft workspace :type draft_id: str """ self.draft_id = draft_id self.request_headers["opencti-draft-id"] = draft_id
[docs] def set_work_id(self, work_id): """Set the work ID header for work validation :param work_id: the ID of the work :type work_id: str """ self.request_headers["opencti-work-id"] = work_id
[docs] def set_synchronized_upsert_header(self, synchronized): """Set the synchronized upsert header. :param synchronized: whether upsert should be synchronized :type synchronized: bool """ self.request_headers["synchronized-upsert"] = ( "true" if synchronized is True else "false" )
[docs] def set_previous_standard_header(self, previous_standard): """Set the previous standard header for update operations. :param previous_standard: the previous standard ID :type previous_standard: str """ self.request_headers["previous-standard"] = previous_standard
[docs] def get_request_headers(self, hide_token=True): """Get a copy of current request headers. :param hide_token: if True, masks the Authorization token with asterisks :type hide_token: bool :return: copy of request headers :rtype: dict """ request_headers_copy = self.request_headers.copy() if hide_token and "Authorization" in request_headers_copy: request_headers_copy["Authorization"] = "*****" return request_headers_copy
[docs] def set_retry_number(self, retry_number): """Set the retry number header for tracking retries. :param retry_number: the current retry attempt number, or None to clear :type retry_number: int or None """ self.request_headers["opencti-retry-number"] = ( "" if retry_number is None else str(retry_number) )
def _extract_files(self, obj, path_prefix=""): """Recursively extract File objects from nested dictionaries. :param obj: the object to search for File objects :type obj: any :param path_prefix: the current path prefix for nested keys :type path_prefix: str :return: tuple of (cleaned_obj, files_vars) where cleaned_obj has Files replaced with None :rtype: tuple """ if isinstance(obj, File): return None, [{"key": path_prefix, "file": obj, "multiple": False}] if ( isinstance(obj, list) and len(obj) > 0 and all(map(lambda x: isinstance(x, File), obj)) ): return [None] * len(obj), [ {"key": path_prefix, "file": obj, "multiple": True} ] if isinstance(obj, dict): cleaned = {} files_vars = [] for key, val in obj.items(): new_path = f"{path_prefix}.{key}" if path_prefix else key cleaned_val, nested_files = self._extract_files(val, new_path) cleaned[key] = cleaned_val files_vars.extend(nested_files) return cleaned, files_vars if isinstance(obj, list): cleaned = [] files_vars = [] for i, item in enumerate(obj): new_path = f"{path_prefix}.{i}" if path_prefix else str(i) cleaned_item, nested_files = self._extract_files(item, new_path) cleaned.append(cleaned_item) files_vars.extend(nested_files) return cleaned, files_vars return obj, []
[docs] def query(self, query, variables=None, disable_impersonate=False): """Submit a query to the OpenCTI GraphQL API. :param query: GraphQL query string :type query: str :param variables: GraphQL query variables, defaults to {} :type variables: dict, optional :param disable_impersonate: removes impersonate header if set to True, defaults to False :type disable_impersonate: bool, optional :return: returns the response JSON content :rtype: dict :raises ValueError: if the API returns an error or non-200 status code """ variables = variables or {} # Implementation of spec https://github.com/jaydenseric/graphql-multipart-request-spec # Support for single or multiple upload # Batching or mixed upload or not supported # Recursively extract File objects from nested dictionaries query_var, files_vars = self._extract_files(variables) query_headers = self.request_headers.copy() if disable_impersonate and "opencti-applicant-id" in query_headers: del query_headers["opencti-applicant-id"] # If yes, transform variable (file to null) and create multipart query if len(files_vars) > 0: multipart_data = { "operations": json.dumps({"query": query, "variables": query_var}) } # Build the multipart map map_index = 0 file_vars = {} for file_var_item in files_vars: is_multiple_files = file_var_item["multiple"] var_name = "variables." + file_var_item["key"] if is_multiple_files: # [(var_name + "." + i)] if is_multiple_files else for _ in file_var_item["file"]: file_vars[str(map_index)] = [var_name + "." + str(map_index)] map_index += 1 else: file_vars[str(map_index)] = [var_name] map_index += 1 multipart_data["map"] = json.dumps(file_vars) # Add the files file_index = 0 multipart_files = [] for file_var_item in files_vars: files = file_var_item["file"] is_multiple_files = file_var_item["multiple"] if is_multiple_files: for file in files: if isinstance(file.data, str): file_multi = ( str(file_index), ( file.name, io.BytesIO(file.data.encode("utf-8", "replace")), file.mime, ), ) else: file_multi = ( str(file_index), (file.name, file.data, file.mime), ) multipart_files.append(file_multi) file_index += 1 else: if isinstance(files.data, str): file_multi = ( str(file_index), ( files.name, io.BytesIO(files.data.encode("utf-8", "replace")), files.mime, ), ) else: file_multi = ( str(file_index), (files.name, files.data, files.mime), ) multipart_files.append(file_multi) file_index += 1 # Send the multipart request r = self.session.post( self.api_url, data=multipart_data, files=multipart_files, headers=query_headers, verify=self.ssl_verify, cert=self.cert, proxies=self.proxies, timeout=self.session_requests_timeout, ) # If no else: r = self.session.post( self.api_url, json={"query": query, "variables": variables}, headers=query_headers, verify=self.ssl_verify, cert=self.cert, proxies=self.proxies, timeout=self.session_requests_timeout, ) # Build response if r.status_code == 200: result = r.json() if "errors" in result: main_error = result["errors"][0] error_name = ( main_error["name"] if "name" in main_error else main_error["message"] ) error_detail = { "name": error_name, "error_message": main_error["message"], } meta_data = main_error["data"] if "data" in main_error else {} # Prevent logging of input as bundle is logged differently if meta_data.get("input") is not None: del meta_data["input"] value_error = {**error_detail, **meta_data} raise ValueError(value_error) else: return result else: raise ValueError(r.text)
[docs] def fetch_opencti_file(self, fetch_uri, binary=False, serialize=False): """Get file from the OpenCTI API. :param fetch_uri: download URI to use :type fetch_uri: str :param binary: if True, returns raw bytes; if False, returns text, defaults to False :type binary: bool, optional :param serialize: if True, returns base64-encoded content, defaults to False :type serialize: bool, optional :return: returns either the file content as text, bytes, base64-encoded string, or None on failure :rtype: str, bytes, or None """ try: r = self.session.get( fetch_uri, headers=self.request_headers, verify=self.ssl_verify, cert=self.cert, proxies=self.proxies, timeout=self.session_requests_timeout, ) # Check if request was successful if not r.ok: self.app_logger.warning( "Failed to fetch file", {"uri": fetch_uri, "status_code": r.status_code}, ) return None if binary: if serialize: return base64.b64encode(r.content).decode("utf-8") return r.content if serialize: return base64.b64encode(r.text.encode("utf-8")).decode("utf-8") return r.text except Exception as e: self.app_logger.warning( "Error fetching file", {"uri": fetch_uri, "error": str(e)} ) return None
[docs] def health_check(self): """Submit an example request to the OpenCTI API. :return: returns True if the health check has been successful :rtype: bool """ try: self.app_logger.info("Health check (platform version)...") test = self.query(""" query healthCheck { about { version } } """) if test is not None: return True except Exception as err: # pylint: disable=broad-except self.app_logger.error(str(err)) return False return False
[docs] def get_logs_worker_config(self): """Get the logs worker configuration from the OpenCTI platform. :return: the logs worker configuration including Elasticsearch settings :rtype: dict """ self.app_logger.info("Getting logs worker config...") query = """ query LogsWorkerConfig { logsWorkerConfig { elasticsearch_url elasticsearch_proxy elasticsearch_index elasticsearch_username elasticsearch_password elasticsearch_api_key elasticsearch_ssl_reject_unauthorized } } """ result = self.query(query) return result["data"]["logsWorkerConfig"]
[docs] def not_empty(self, value): """Check if a value is empty for str, list and int. :param value: value to check :type value: str or list or int or float or bool or datetime.date :return: returns True if the value is one of the supported types and not empty :rtype: bool """ if value is not None: if isinstance(value, bool): return True if isinstance(value, datetime.date): return True if isinstance(value, str): return len(value) > 0 if isinstance(value, dict): return bool(value) if isinstance(value, list): is_not_empty = False for v in value: if len(v) > 0: is_not_empty = True return is_not_empty if isinstance(value, float): return True if isinstance(value, int): return True return False return False
[docs] def process_multiple(self, data: dict, with_pagination=False) -> Union[dict, list]: """Process data returned by the OpenCTI API with multiple entities. :param data: data to process :type data: dict :param with_pagination: whether to use pagination with the API, defaults to False :type with_pagination: bool, optional :return: returns either a dict or list with the processed entities :rtype: dict or list """ if with_pagination: result = {"entities": [], "pagination": {}} else: result = [] if data is None: return result # Data can be multiple in edges or directly. # -- When data is directly a listing if isinstance(data, list): for row in data: if with_pagination: result["entities"].append(self.process_multiple_fields(row)) else: result.append(self.process_multiple_fields(row)) return result # -- When data is wrapped in edges for edge in ( data["edges"] if "edges" in data and data["edges"] is not None else [] ): row = edge["node"] if with_pagination: result["entities"].append(self.process_multiple_fields(row)) else: result.append(self.process_multiple_fields(row)) # -- Add page info if required if with_pagination and "pageInfo" in data: result["pagination"] = data["pageInfo"] return result
[docs] def process_multiple_ids(self, data) -> list: """Process data returned by the OpenCTI API with multiple ids. :param data: data to process :type data: list :return: returns a list of ids :rtype: list """ result = [] if data is None: return result if isinstance(data, list): for d in data: if isinstance(d, dict) and "id" in d: result.append(d["id"]) return result
[docs] def process_multiple_fields(self, data): """Process data returned by the OpenCTI API with multiple fields. :param data: data to process :type data: dict :return: returns the data dict with all fields processed :rtype: dict """ # Handle process_multiple_fields specific case attribute = OpenCTIStix2Utils.retrieveClassForMethod( self, data, "entity_type", "process_multiple_fields" ) if attribute is not None: data = attribute.process_multiple_fields(data) if data is None: return data if "createdBy" in data and data["createdBy"] is not None: data["createdById"] = data["createdBy"]["id"] if "objectMarking" in data["createdBy"]: data["createdBy"]["objectMarking"] = self.process_multiple( data["createdBy"]["objectMarking"] ) data["createdBy"]["objectMarkingIds"] = self.process_multiple_ids( data["createdBy"]["objectMarking"] ) if "objectLabel" in data["createdBy"]: data["createdBy"]["objectLabel"] = self.process_multiple( data["createdBy"]["objectLabel"] ) data["createdBy"]["objectLabelIds"] = self.process_multiple_ids( data["createdBy"]["objectLabel"] ) else: data["createdById"] = None if "objectMarking" in data: data["objectMarking"] = self.process_multiple(data["objectMarking"]) data["objectMarkingIds"] = self.process_multiple_ids(data["objectMarking"]) if "objectLabel" in data: data["objectLabel"] = self.process_multiple(data["objectLabel"]) data["objectLabelIds"] = self.process_multiple_ids(data["objectLabel"]) if "reports" in data: data["reports"] = self.process_multiple(data["reports"]) data["reportsIds"] = self.process_multiple_ids(data["reports"]) if "notes" in data: data["notes"] = self.process_multiple(data["notes"]) data["notesIds"] = self.process_multiple_ids(data["notes"]) if "opinions" in data: data["opinions"] = self.process_multiple(data["opinions"]) data["opinionsIds"] = self.process_multiple_ids(data["opinions"]) if "observedData" in data: data["observedData"] = self.process_multiple(data["observedData"]) data["observedDataIds"] = self.process_multiple_ids(data["observedData"]) if "killChainPhases" in data: data["killChainPhases"] = self.process_multiple(data["killChainPhases"]) data["killChainPhasesIds"] = self.process_multiple_ids( data["killChainPhases"] ) if "externalReferences" in data: data["externalReferences"] = self.process_multiple( data["externalReferences"] ) data["externalReferencesIds"] = self.process_multiple_ids( data["externalReferences"] ) if "objects" in data: data["objects"] = self.process_multiple(data["objects"]) data["objectsIds"] = self.process_multiple_ids(data["objects"]) if "observables" in data: data["observables"] = self.process_multiple(data["observables"]) data["observablesIds"] = self.process_multiple_ids(data["observables"]) if "stixCoreRelationships" in data: data["stixCoreRelationships"] = self.process_multiple( data["stixCoreRelationships"] ) data["stixCoreRelationshipsIds"] = self.process_multiple_ids( data["stixCoreRelationships"] ) if "indicators" in data: data["indicators"] = self.process_multiple(data["indicators"]) data["indicatorsIds"] = self.process_multiple_ids(data["indicators"]) if "importFiles" in data: data["importFiles"] = self.process_multiple(data["importFiles"]) data["importFilesIds"] = self.process_multiple_ids(data["importFiles"]) # See aliases of GraphQL query in stix_core_object method if "name_alt" in data: data["name"] = data["name_alt"] del data["name_alt"] if "content_alt" in data: data["content"] = data["content_alt"] del data["content_alt"] return data
[docs] def upload_file(self, **kwargs): """upload a file to OpenCTI API :param `**kwargs`: arguments for file upload (required: `file_name` and `data`) :return: returns the query response for the file upload :rtype: dict """ file_name = kwargs.get("file_name", None) file_markings = kwargs.get("file_markings", None) data = kwargs.get("data", None) mime_type = kwargs.get("mime_type", "text/plain") if file_name is not None: self.app_logger.info("Uploading a file.") query = """ mutation UploadImport($file: Upload!, $fileMarkings: [String]) { uploadImport(file: $file, fileMarkings: $fileMarkings) { id name } } """ if data is None: with open(file_name, "rb") as f: data = f.read() if file_name.endswith(".json"): mime_type = "application/json" else: mime_type = magic.from_file(file_name, mime=True) query_vars = {"file": File(file_name, data, mime_type)} # optional file markings if file_markings is not None: query_vars["fileMarkings"] = file_markings return self.query(query, query_vars) else: self.app_logger.error("[upload] Missing parameter: file_name") return None
[docs] def create_draft(self, **kwargs): """Create a draft in OpenCTI API. :param draft_name: the name of the draft to create (required) :type draft_name: str :param entity_id: the entity ID to associate with the draft :type entity_id: str, optional :return: returns the draft workspace ID :rtype: str """ draft_name = kwargs.get("draft_name", None) entity_id = kwargs.get("entity_id", None) if draft_name is not None: self.app_logger.info("Creating a draft.") query = """ mutation draftWorkspaceAdd($input: DraftWorkspaceAddInput!) { draftWorkspaceAdd(input: $input) { id } } """ queryResult = self.query( query, {"input": {"name": draft_name, "entity_id": entity_id}}, ) return queryResult["data"]["draftWorkspaceAdd"]["id"] else: self.app_logger.error("[create_draft] Missing parameter: draft_name") return None
[docs] def upload_pending_file(self, **kwargs): """Upload a pending file to OpenCTI API. :param file_name: the name of the file to upload (required) :type file_name: str :param data: the file content, defaults to reading from file_name path :type data: str or bytes, optional :param mime_type: the MIME type of the file, defaults to "text/plain" :type mime_type: str, optional :param entity_id: the entity ID to associate with the file :type entity_id: str, optional :param file_markings: list of marking definition IDs to apply :type file_markings: list, optional :return: returns the query response for the file upload :rtype: dict """ file_name = kwargs.get("file_name", None) data = kwargs.get("data", None) mime_type = kwargs.get("mime_type", "text/plain") entity_id = kwargs.get("entity_id", None) file_markings = kwargs.get("file_markings", []) if file_name is not None: self.app_logger.info("Uploading a file.") query = """ mutation UploadPending($file: Upload!, $entityId: String, $file_markings: [String!]) { uploadPending(file: $file, entityId: $entityId, file_markings: $file_markings) { id name } } """ if data is None: with open(file_name, "rb") as f: data = f.read() if file_name.endswith(".json"): mime_type = "application/json" else: mime_type = magic.from_file(file_name, mime=True) return self.query( query, { "file": File(file_name, data, mime_type), "entityId": entity_id, "file_markings": file_markings, }, ) else: self.app_logger.error("[upload] Missing parameter: file_name") return None
[docs] def send_bundle_to_api(self, **kwargs): """Push a bundle to a queue through OpenCTI API. :param connector_id: the connector ID (required) :type connector_id: str :param bundle: the STIX bundle to push (required) :type bundle: str :param work_id: the work ID to associate with the bundle :type work_id: str, optional :return: returns the query response for the bundle push :rtype: dict """ connector_id = kwargs.get("connector_id", None) work_id = kwargs.get("work_id", None) bundle = kwargs.get("bundle", None) if connector_id is not None and bundle is not None: self.app_logger.info( "Pushing a bundle to queue through API", {connector_id} ) mutation = """ mutation StixBundlePush($connectorId: String!, $bundle: String!, $work_id: String) { stixBundlePush(connectorId: $connectorId, bundle: $bundle, work_id: $work_id) } """ return self.query( mutation, {"connectorId": connector_id, "bundle": bundle, "work_id": work_id}, ) else: self.app_logger.error( "[bundle push] Missing parameter: connector_id or bundle" ) return None
[docs] def get_stix_content(self, id): """Get the STIX content of any entity. :param id: the ID of the entity :type id: str :return: the STIX content in JSON :rtype: dict """ self.app_logger.info("Entity in JSON", {"id": id}) query = """ query StixQuery($id: String!) { stix(id: $id) } """ result = self.query(query, {"id": id}) return json.loads(result["data"]["stix"])
[docs] def connector_jwt(self): self.app_logger.info("Generating connector JWT token") query = """ mutation ConnectorJWT { connectorJWT } """ query_result = self.query(query) return query_result["data"]["connectorJWT"]
@staticmethod
[docs] def get_attribute_in_extension(key, stix_object) -> Any: """Get an attribute value from OpenCTI STIX extensions. Searches for the key in OpenCTI extension definitions, or falls back to the object's top-level attributes. :param key: the attribute key to retrieve :type key: str :param stix_object: the STIX object containing extensions :type stix_object: dict :return: the attribute value if found, None otherwise :rtype: Any """ if ( "extensions" in stix_object and "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba" in stix_object["extensions"] and key in stix_object["extensions"][ "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba" ] ): return stix_object["extensions"][ "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba" ][key] elif ( "extensions" in stix_object and "extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82" in stix_object["extensions"] and key in stix_object["extensions"][ "extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82" ] ): return stix_object["extensions"][ "extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82" ][key] elif key in stix_object and key not in ["type"]: return stix_object[key] return None
@staticmethod
[docs] def get_attribute_in_mitre_extension(key, stix_object) -> Any: """Get an attribute value from MITRE ATT&CK STIX extension. :param key: the attribute key to retrieve :type key: str :param stix_object: the STIX object containing extensions :type stix_object: dict :return: the attribute value if found, None otherwise :rtype: Any """ if ( "extensions" in stix_object and "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b" in stix_object["extensions"] and key in stix_object["extensions"][ "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b" ] ): return stix_object["extensions"][ "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b" ][key] return None
# Global cleanup functions for proxy certificates singleton def _cleanup_proxy_certificates(): """Clean up temporary certificate directory for proxy certificates. This function is called on normal program exit via atexit. """ global _PROXY_CERT_DIR if _PROXY_CERT_DIR and os.path.exists(_PROXY_CERT_DIR): try: shutil.rmtree(_PROXY_CERT_DIR) except Exception: # Silently fail cleanup - best effort pass finally: _PROXY_CERT_DIR = None def _signal_handler_proxy_cleanup(signum, frame): """Handle termination signals (SIGTERM/SIGINT) for proxy certificate cleanup. Performs cleanup and then raises SystemExit to allow normal shutdown procedures to complete. :param signum: Signal number :param frame: Current stack frame """ _cleanup_proxy_certificates() raise SystemExit(0)