pycti.connector.opencti_connector_helper ======================================== .. py:module:: pycti.connector.opencti_connector_helper .. autoapi-nested-parse:: OpenCTI Connector Helper module. This module provides the main helper class and utilities for building OpenCTI connectors. It handles connector registration, message queue communication, stream listening, scheduling, and STIX2 bundle processing. Key components: - OpenCTIConnectorHelper: Main class for connector development - ListenQueue: Handles RabbitMQ message consumption - ListenStream: Handles SSE stream consumption - PingAlive: Maintains connector heartbeat with the platform - ConnectorInfo: Stores connector runtime information .. rubric:: Example >>> from pycti import OpenCTIConnectorHelper >>> helper = OpenCTIConnectorHelper(config) >>> helper.listen(callback_function) Attributes ---------- .. autoapisummary:: pycti.connector.opencti_connector_helper.TRUTHY pycti.connector.opencti_connector_helper.FALSY pycti.connector.opencti_connector_helper.app Classes ------- .. autoapisummary:: pycti.connector.opencti_connector_helper.ListenQueue pycti.connector.opencti_connector_helper.PingAlive pycti.connector.opencti_connector_helper.StreamAlive pycti.connector.opencti_connector_helper.RateLimiter pycti.connector.opencti_connector_helper.RateLimitedCallback pycti.connector.opencti_connector_helper.BatchCallbackWrapper pycti.connector.opencti_connector_helper.ListenStream pycti.connector.opencti_connector_helper.ConnectorInfo pycti.connector.opencti_connector_helper.OpenCTIConnectorHelper Functions --------- .. autoapisummary:: pycti.connector.opencti_connector_helper.killProgramHook pycti.connector.opencti_connector_helper.start_loop pycti.connector.opencti_connector_helper.get_config_variable pycti.connector.opencti_connector_helper.normalize_email_prefix pycti.connector.opencti_connector_helper.is_memory_certificate pycti.connector.opencti_connector_helper.ssl_verify_locations pycti.connector.opencti_connector_helper.data_to_temp_file pycti.connector.opencti_connector_helper.ssl_cert_chain pycti.connector.opencti_connector_helper.create_callback_ssl_context pycti.connector.opencti_connector_helper.create_mq_ssl_context Module Contents --------------- .. py:data:: TRUTHY :type: List[str] :value: ['yes', 'true', 'True'] List of string values considered as boolean True. .. py:data:: FALSY :type: List[str] :value: ['no', 'false', 'False'] List of string values considered as boolean False. .. py:data:: app .. py:function:: killProgramHook(etype, value, tb) -> None Exception hook to terminate the program on unhandled exceptions. This function is used as a system exception hook to ensure the program terminates cleanly when an unhandled exception occurs, particularly useful for background threads. :param etype: The exception type (class) :type etype: type :param value: The exception instance :type value: BaseException :param tb: The traceback object :type tb: types.TracebackType .. py:function:: start_loop(loop) -> None Start an asyncio event loop and run it forever. Sets the given event loop as the current loop for the thread and runs it indefinitely until stopped. :param loop: The asyncio event loop to start :type loop: asyncio.AbstractEventLoop .. py:function:: get_config_variable(env_var: str, yaml_path: List, config: Optional[Dict] = None, isNumber: Optional[bool] = False, default=None, required=False) -> Union[bool, int, None, str] Retrieve a configuration variable from environment or YAML config. Looks up configuration values with the following precedence: 1. Environment variable (highest priority) 2. YAML configuration file 3. Default value (lowest priority) Boolean string values ("yes", "true", "True", "no", "false", "False") are automatically converted to Python bool. :param env_var: Name of the environment variable to check :type env_var: str :param yaml_path: Two-element list specifying [section, key] in YAML config :type yaml_path: List[str] :param config: Configuration dictionary loaded from YAML file :type config: Dict :param isNumber: If True, convert the value to integer :type isNumber: bool :param default: Default value if not found in env or config :type default: any :param required: If True and no value found, raise ValueError :type required: bool :return: The configuration value as bool, int, str, or None :rtype: Union[bool, int, None, str] :raises ValueError: If required=True and no value is found .. rubric:: Example >>> get_config_variable("OPENCTI_URL", ["opencti", "url"], config) 'http://localhost:8080' >>> get_config_variable("CONNECTOR_LOG_LEVEL", ["connector", "log_level"], ... config, default="INFO") 'INFO' .. py:function:: normalize_email_prefix(email: str) -> str Normalize the local part of an email address by replacing invalid characters. Replaces any characters not valid in email prefixes with hyphens. Valid characters include: a-z, A-Z, 0-9, and special chars: . _ + - Consecutive hyphens are collapsed and leading/trailing hyphens are removed. :param email: Email address to normalize :type email: str :return: Normalized email address with valid local part :rtype: str :raises ValueError: If the email address does not contain an '@' symbol .. rubric:: Example >>> normalize_email_prefix("john.doe@example.com") 'john.doe@example.com' >>> normalize_email_prefix("john@doe@example.com") 'john-doe@example.com' >>> normalize_email_prefix("user!name@domain.com") 'user-name@domain.com' .. py:function:: is_memory_certificate(certificate: str) -> bool Check if a certificate is provided as a PEM string in memory. Determines whether the certificate data is an in-memory PEM-formatted string (starting with "-----BEGIN") rather than a file path. :param certificate: The certificate data to check (PEM string or file path) :type certificate: str :return: True if the certificate is a PEM string, False if it's a file path :rtype: bool .. rubric:: Example >>> is_memory_certificate("-----BEGIN CERTIFICATE-----\n...") True >>> is_memory_certificate("/path/to/cert.pem") False .. py:function:: ssl_verify_locations(ssl_context: ssl.SSLContext, certdata: Optional[str]) -> None Load CA certificate verification locations into an SSL context. Configures the SSL context with certificate authority (CA) certificates for verifying peer certificates. Supports both file paths and in-memory PEM-formatted certificates. :param ssl_context: The SSL context to configure :type ssl_context: ssl.SSLContext :param certdata: CA certificate data as file path or PEM string, or None to skip :type certdata: str or None .. rubric:: Example >>> ssl_ctx = ssl.create_default_context() >>> ssl_verify_locations(ssl_ctx, "/path/to/ca-bundle.crt") >>> ssl_verify_locations(ssl_ctx, "-----BEGIN CERTIFICATE-----\n...") .. py:function:: data_to_temp_file(data: str) -> str Write data to a temporary file securely. Creates a temporary file with secure permissions (readable and writable only by the creating user). The file descriptor is not inherited by child processes. .. note:: The caller is responsible for deleting the temporary file when it is no longer needed. :param data: The string data to write to the temporary file :type data: str :return: Absolute path to the created temporary file :rtype: str .. rubric:: Example >>> path = data_to_temp_file("-----BEGIN PRIVATE KEY-----\n...") >>> # Use the file... >>> os.unlink(path) # Clean up when done .. py:function:: ssl_cert_chain(ssl_context: ssl.SSLContext, cert_data: Optional[str], key_data: Optional[str], passphrase: Optional[str]) -> None Load a certificate chain and private key into an SSL context. Configures the SSL context with a client certificate and private key for mutual TLS authentication. Supports both file paths and in-memory PEM-formatted certificates/keys. Temporary files are created and cleaned up automatically when using in-memory data. :param ssl_context: The SSL context to configure :type ssl_context: ssl.SSLContext :param cert_data: Certificate data as file path or PEM string, or None to skip :type cert_data: str or None :param key_data: Private key data as file path or PEM string, or None :type key_data: str or None :param passphrase: Passphrase for encrypted private key, or None if unencrypted :type passphrase: str or None .. rubric:: Example >>> ssl_ctx = ssl.create_default_context() >>> ssl_cert_chain(ssl_ctx, "/path/to/cert.pem", "/path/to/key.pem", None) .. py:function:: create_callback_ssl_context(config: Dict) -> ssl.SSLContext Create an SSL context for the API callback server. Creates and configures an SSL context suitable for the HTTPS callback server used in API listen protocol mode. Loads certificate chain from configuration. Configuration keys used: - LISTEN_PROTOCOL_API_SSL_KEY: Path or PEM string for SSL private key - LISTEN_PROTOCOL_API_SSL_CERT: Path or PEM string for SSL certificate - LISTEN_PROTOCOL_API_SSL_PASSPHRASE: Optional passphrase for private key :param config: Configuration dictionary containing SSL settings :type config: Dict :return: Configured SSL context for client authentication :rtype: ssl.SSLContext .. py:function:: create_mq_ssl_context(config: Dict) -> ssl.SSLContext Create an SSL context for RabbitMQ message queue connections. Creates and configures an SSL context for secure connections to RabbitMQ. Supports CA verification, client certificates, and optional certificate verification bypass. Configuration keys used: - MQ_USE_SSL_CA: CA certificate for server verification - MQ_USE_SSL_CERT: Client certificate for mutual TLS - MQ_USE_SSL_KEY: Client private key for mutual TLS - MQ_USE_SSL_REJECT_UNAUTHORIZED: Whether to verify server certificate - MQ_USE_SSL_PASSPHRASE: Optional passphrase for private key :param config: Configuration dictionary containing MQ SSL settings :type config: Dict :return: Configured SSL context for RabbitMQ connections :rtype: ssl.SSLContext .. py:class:: ListenQueue(helper, opencti_token: str, config: Dict, connector_config: Dict, applicant_id: str, listen_protocol: str, listen_protocol_api_ssl: bool, listen_protocol_api_path: str, listen_protocol_api_port: int, callback: Callable[[Dict], str]) Bases: :py:obj:`threading.Thread` Thread class for consuming messages from RabbitMQ or HTTP API. Handles message consumption from either RabbitMQ (AMQP protocol) or an HTTP API endpoint, depending on the configured listen protocol. Messages are processed through a callback function provided at initialization. This class supports two listen protocols: - AMQP: Connects to RabbitMQ and consumes messages from a queue - API: Starts an HTTP server and receives messages via POST requests :param helper: The OpenCTIConnectorHelper instance :type helper: OpenCTIConnectorHelper :param opencti_token: Authentication token for OpenCTI API :type opencti_token: str :param config: Global configuration dictionary :type config: Dict :param connector_config: Connector-specific configuration from registration :type connector_config: Dict :param applicant_id: ID of the user/connector making requests :type applicant_id: str :param listen_protocol: Protocol to use ("AMQP" or "API") :type listen_protocol: str :param listen_protocol_api_ssl: Whether to use SSL for API protocol :type listen_protocol_api_ssl: bool :param listen_protocol_api_path: URL path for API endpoint :type listen_protocol_api_path: str :param listen_protocol_api_port: Port for API server :type listen_protocol_api_port: int :param callback: Function to call when processing messages :type callback: Callable[[Dict], str] Initialize the ListenQueue thread. :param helper: The OpenCTIConnectorHelper instance :type helper: OpenCTIConnectorHelper :param opencti_token: Authentication token for OpenCTI API :type opencti_token: str :param config: Global configuration dictionary :type config: Dict :param connector_config: Connector configuration from registration :type connector_config: Dict :param applicant_id: ID of the user/connector making requests :type applicant_id: str :param listen_protocol: Protocol to use ("AMQP" or "API") :type listen_protocol: str :param listen_protocol_api_ssl: Whether to use SSL for API protocol :type listen_protocol_api_ssl: bool :param listen_protocol_api_path: URL path for API endpoint :type listen_protocol_api_path: str :param listen_protocol_api_port: Port for API server :type listen_protocol_api_port: int :param callback: Function to process received messages :type callback: Callable[[Dict], str] .. py:attribute:: pika_credentials :value: None .. py:attribute:: pika_parameters :value: None .. py:attribute:: pika_connection :value: None .. py:attribute:: channel :value: None .. py:attribute:: helper .. py:attribute:: callback .. py:attribute:: config .. py:attribute:: opencti_token .. py:attribute:: listen_protocol .. py:attribute:: listen_protocol_api_ssl .. py:attribute:: listen_protocol_api_path .. py:attribute:: listen_protocol_api_port .. py:attribute:: connector_applicant_id .. py:attribute:: host .. py:attribute:: vhost .. py:attribute:: use_ssl .. py:attribute:: port .. py:attribute:: user .. py:attribute:: password .. py:attribute:: connector_jwks .. py:attribute:: queue_name .. py:attribute:: exit_event .. py:attribute:: thread :value: None .. py:method:: is_token_valid(token) Returns True if the signature is valid and token is not expired. Returns False otherwise. .. py:method:: run() -> None Execute the message listening thread. Starts the appropriate listener based on the configured protocol: - AMQP: Connects to RabbitMQ and consumes messages from the queue - API: Starts a FastAPI/Uvicorn HTTP server to receive messages The thread runs until stopped via the stop() method or an error occurs. :raises ValueError: If an unsupported listen protocol is configured .. py:method:: stop() Stop the ListenQueue thread and close connections. This method sets the exit event, closes the RabbitMQ connection, and waits for the processing thread to complete. .. py:class:: PingAlive(connector_logger, connector_id: str, api, get_state: Callable[[], Optional[Dict]], set_state: Callable[[str], None], metric, connector_info) Bases: :py:obj:`threading.Thread` Daemon thread that maintains connector heartbeat with OpenCTI platform. Periodically pings the OpenCTI API to indicate the connector is alive and synchronizes connector state between local and remote instances. :param connector_logger: Logger instance for the connector :type connector_logger: logging.Logger :param connector_id: Unique identifier of the connector :type connector_id: str :param api: OpenCTI API client instance :type api: OpenCTIApiClient :param get_state: Function to retrieve current connector state :type get_state: Callable[[], Optional[Dict]] :param set_state: Function to update connector state :type set_state: Callable[[str], None] :param metric: Metric handler for recording ping statistics :type metric: OpenCTIMetricHandler :param connector_info: ConnectorInfo instance with runtime details :type connector_info: ConnectorInfo Initialize the PingAlive daemon thread. :param connector_logger: Logger instance for the connector :type connector_logger: logging.Logger :param connector_id: Unique identifier of the connector :type connector_id: str :param api: OpenCTI API client instance :type api: OpenCTIApiClient :param get_state: Function to retrieve current connector state :type get_state: Callable[[], Optional[Dict]] :param set_state: Function to update connector state :type set_state: Callable[[str], None] :param metric: Metric handler for recording ping statistics :type metric: OpenCTIMetricHandler :param connector_info: ConnectorInfo instance with runtime details :type connector_info: ConnectorInfo .. py:attribute:: connector_logger .. py:attribute:: connector_id .. py:attribute:: in_error :value: False .. py:attribute:: api .. py:attribute:: get_state .. py:attribute:: set_state .. py:attribute:: exit_event .. py:attribute:: metric .. py:attribute:: connector_info .. py:method:: ping() -> None Execute the ping loop to maintain connector heartbeat. Continuously pings the OpenCTI API every 40 seconds to: - Signal that the connector is alive - Send current connector state and info - Receive and apply any remote state updates If the remote state differs from local state, the local state is updated to match. This allows state resets from the UI. The loop continues until the exit_event is set. .. py:method:: run() -> None Start the PingAlive thread execution. Entry point for the thread that initiates the ping loop. .. py:method:: stop() -> None Stop the PingAlive thread gracefully. Sets the exit event to signal the ping loop to terminate. .. py:class:: StreamAlive(helper, q: queue.Queue) Bases: :py:obj:`threading.Thread` Watchdog thread that monitors SSE stream health via heartbeat messages. Monitors a queue for heartbeat signals from the stream listener. If no heartbeat is received within 45 seconds, the connector is stopped to allow for reconnection. :param helper: The OpenCTIConnectorHelper instance :type helper: OpenCTIConnectorHelper :param q: Queue for receiving heartbeat signals from stream listener :type q: Queue Initialize the StreamAlive watchdog thread. :param helper: The OpenCTIConnectorHelper instance :type helper: OpenCTIConnectorHelper :param q: Queue for receiving heartbeat signals :type q: Queue .. py:attribute:: helper .. py:attribute:: q .. py:attribute:: exit_event .. py:method:: run() -> None Execute the stream health monitoring loop. Checks every 5 seconds for heartbeat signals in the queue. If no signal is received for 45 seconds, terminates the connector to trigger a reconnection. .. py:method:: stop() -> None Stop the StreamAlive watchdog thread gracefully. Sets the exit event to signal the monitoring loop to terminate. .. py:class:: RateLimiter(helper, max_per_minute: int) Rate limiter using sliding window algorithm. Usage: # For batch mode (alternative to using max_per_minute in create_batch_callback): batch_callback = helper.create_batch_callback( process_batch_func, batch_size=100, batch_timeout=30 ) rate_limiter = helper.create_rate_limiter(max_per_minute=10) rate_limited_callback = rate_limiter.wrap(batch_callback) helper.listen_stream(message_callback=rate_limited_callback) # For non-batch mode: rate_limiter = helper.create_rate_limiter(max_per_minute=100) rate_limited_callback = rate_limiter.wrap(process_message) helper.listen_stream(message_callback=rate_limited_callback) Initialize the rate limiter. :param helper: OpenCTIConnectorHelper instance for logging :param max_per_minute: Maximum number of calls allowed per minute .. py:attribute:: helper .. py:attribute:: max_per_minute .. py:attribute:: timestamps :type: collections.deque .. py:method:: set_heartbeat_queue(q: queue.Queue) -> None Set the heartbeat queue for sending keepalive signals during rate limiting. :param q: Queue used by StreamAlive for heartbeat monitoring .. py:method:: stop() -> None Signal the rate limiter to stop waiting. .. py:method:: wait_if_needed() -> float Wait if rate limit is exceeded. Thread-safe. Uses a sliding window algorithm to enforce max_per_minute. Sleeps if necessary to stay within the limit. :return: Time spent waiting (seconds), 0 if no wait needed .. py:method:: wrap(callback: Callable) -> RateLimitedCallback Wrap a callback with rate limiting. :param callback: The callback to wrap (can be a function or BatchCallbackWrapper) :return: A callable that applies rate limiting before calling the original callback .. py:class:: RateLimitedCallback(rate_limiter: RateLimiter, callback: Callable) A callback wrapper that applies rate limiting. This class wraps any callable and applies rate limiting before each call. It preserves the stop() and set_heartbeat_queue() methods from the wrapped callback if they exist. Initialize the rate-limited callback. :param rate_limiter: The RateLimiter instance to use :param callback: The callback to wrap .. py:method:: __call__(*args, **kwargs) Call the wrapped callback with rate limiting. .. py:method:: stop() -> None Stop both the rate limiter and the wrapped callback. .. py:method:: set_heartbeat_queue(q: queue.Queue) -> None Set heartbeat queue on both rate limiter and wrapped callback. .. py:property:: manages_state :type: bool Propagate manages_state from wrapped callback. .. py:class:: BatchCallbackWrapper(helper, batch_callback: Callable, batch_size: Optional[int] = None, batch_timeout: Optional[float] = None) Wraps a batch callback to work with single-message listen_stream. This class accumulates individual messages and processes them in batches based on batch_size or batch_timeout conditions. It can be used as a callback with the listen_stream method to enable batch processing. A dedicated timer thread handles all batch processing to avoid race conditions between size-triggered and timeout-triggered batches. For rate limiting, use the max_per_minute parameter in create_batch_callback(). Usage: # Basic batch processing: batch_callback = helper.create_batch_callback( process_batch_func, batch_size=100, batch_timeout=30 ) helper.listen_stream(message_callback=batch_callback) # With rate limiting (recommended): batch_callback = helper.create_batch_callback( process_batch_func, batch_size=100, batch_timeout=30, max_per_minute=10 ) helper.listen_stream(message_callback=batch_callback) Initialize the batch callback wrapper. :param helper: OpenCTIConnectorHelper instance :param batch_callback: Function to call with batched events :param batch_size: Process batch when this many events accumulated :param batch_timeout: Process batch after this many seconds .. py:attribute:: manages_state :value: True .. py:attribute:: helper .. py:attribute:: batch_callback .. py:attribute:: batch_size :value: None .. py:attribute:: batch_timeout :value: None .. py:attribute:: batch :type: List :value: [] .. py:attribute:: batch_start_time :type: Optional[float] :value: None .. py:method:: __call__(msg) -> None Accumulate a message into the current batch. This method only accumulates messages; batch processing is handled by the dedicated timer thread to avoid race conditions. :param msg: SSE message object from the stream .. py:method:: stop() -> None Stop the batch wrapper and process remaining messages. This method signals the timer thread to stop and waits for it to finish processing any remaining messages in the batch. .. py:method:: set_heartbeat_queue(q: queue.Queue) -> None Set the heartbeat queue for sending keepalive signals during batch processing. :param q: Queue used by StreamAlive for heartbeat monitoring .. py:class:: ListenStream(helper, callback: Callable, url: str, token: str, verify_ssl: bool, start_timestamp: Optional[str], live_stream_id: Optional[str], listen_delete: bool, no_dependencies: bool, recover_iso_date: Optional[str], with_inferences: bool) Bases: :py:obj:`threading.Thread` Thread class for consuming events from OpenCTI SSE stream. Connects to an OpenCTI event stream and processes events through a callback function. Supports recovery from a specific point in time and various filtering options. :param helper: The OpenCTIConnectorHelper instance :type helper: OpenCTIConnectorHelper :param callback: Function to call for each stream event :type callback: Callable :param url: Base URL for the stream endpoint :type url: str :param token: Authentication token for the stream :type token: str :param verify_ssl: Whether to verify SSL certificates :type verify_ssl: bool :param start_timestamp: Timestamp to start reading from (format: "timestamp-0") :type start_timestamp: str or None :param live_stream_id: ID of the specific stream to connect to :type live_stream_id: str or None :param listen_delete: Whether to receive delete events :type listen_delete: bool :param no_dependencies: Whether to exclude dependency objects :type no_dependencies: bool :param recover_iso_date: ISO date to recover events from :type recover_iso_date: str or None :param with_inferences: Whether to include inferred relationships :type with_inferences: bool Initialize the ListenStream thread. :param helper: The OpenCTIConnectorHelper instance :type helper: OpenCTIConnectorHelper :param callback: Function to process stream events :type callback: Callable :param url: Base URL for the stream endpoint :type url: str :param token: Authentication token :type token: str :param verify_ssl: Whether to verify SSL certificates :type verify_ssl: bool :param start_timestamp: Starting timestamp for stream position :type start_timestamp: str or None :param live_stream_id: Specific stream ID to connect to :type live_stream_id: str or None :param listen_delete: Whether to receive delete events :type listen_delete: bool :param no_dependencies: Whether to exclude dependencies :type no_dependencies: bool :param recover_iso_date: ISO date for event recovery :type recover_iso_date: str or None :param with_inferences: Whether to include inferences :type with_inferences: bool .. py:attribute:: helper .. py:attribute:: callback .. py:attribute:: url .. py:attribute:: token .. py:attribute:: verify_ssl .. py:attribute:: start_timestamp .. py:attribute:: live_stream_id .. py:attribute:: listen_delete .. py:attribute:: no_dependencies .. py:attribute:: recover_iso_date .. py:attribute:: with_inferences .. py:attribute:: exit_event .. py:method:: run() -> None Execute the stream listening loop. Connects to the OpenCTI SSE stream and processes events: - Initializes or restores stream position from connector state - Starts a StreamAlive watchdog for health monitoring - Processes heartbeat, connected, and data events - Updates connector state with latest event ID - Calls the callback function for each data event The loop continues until stopped or an error occurs. .. py:method:: stop() Stop the ListenStream thread. This method sets the exit event to signal the stream listening thread to stop. If the callback has a stop method (e.g., BatchCallbackWrapper or RateLimitedCallback), it will be called to ensure proper cleanup. .. py:class:: ConnectorInfo(run_and_terminate: bool = False, buffering: bool = False, queue_threshold: float = 500.0, queue_messages_size: float = 0.0, next_run_datetime: datetime = None, last_run_datetime: datetime = None) Container for connector runtime information and status. Stores runtime metrics and status information about the connector, which is sent to the OpenCTI platform during ping operations. :param run_and_terminate: Whether connector runs once and terminates :type run_and_terminate: bool :param buffering: Whether connector is currently buffering due to queue limits :type buffering: bool :param queue_threshold: Maximum allowed queue size in MB before buffering :type queue_threshold: float :param queue_messages_size: Current size of queued messages in MB :type queue_messages_size: float :param next_run_datetime: Scheduled datetime for next connector run :type next_run_datetime: datetime or None :param last_run_datetime: Datetime of the last connector run :type last_run_datetime: datetime or None .. rubric:: Example >>> info = ConnectorInfo(run_and_terminate=False, queue_threshold=500.0) >>> info.buffering = True >>> info.queue_messages_size = 450.0 >>> details = info.all_details Initialize ConnectorInfo with runtime parameters. :param run_and_terminate: Whether connector runs once and terminates :type run_and_terminate: bool :param buffering: Whether connector is buffering :type buffering: bool :param queue_threshold: Maximum queue size in MB :type queue_threshold: float :param queue_messages_size: Current queue size in MB :type queue_messages_size: float :param next_run_datetime: Next scheduled run time :type next_run_datetime: datetime or None :param last_run_datetime: Last run time :type last_run_datetime: datetime or None .. py:property:: all_details Get all connector information details as a dictionary. :return: Dictionary containing all connector status information :rtype: dict .. py:property:: run_and_terminate :type: bool Get the run_and_terminate flag. :return: Whether the connector runs once and terminates :rtype: bool .. py:property:: buffering :type: bool Get the buffering status. :return: Whether the connector is currently buffering :rtype: bool .. py:property:: queue_threshold :type: float Get the queue threshold value. :return: Maximum allowed queue size in MB :rtype: float .. py:property:: queue_messages_size :type: float Get the current queue messages size. :return: Current size of queued messages in MB :rtype: float .. py:property:: next_run_datetime :type: datetime Get the next scheduled run datetime. :return: Datetime for the next scheduled run, or None if not scheduled :rtype: datetime or None .. py:property:: last_run_datetime :type: datetime Get the last run datetime. :return: Datetime of the last connector run, or None if never run :rtype: datetime or None .. py:class:: OpenCTIConnectorHelper(config: Dict, playbook_compatible: bool = False) Main helper class for developing OpenCTI connectors. Provides a comprehensive API for connector development, handling: - Connector registration and configuration - Message queue communication (RabbitMQ/API) - SSE stream consumption - STIX2 bundle creation and submission - Scheduling and lifecycle management - Metrics and logging :param config: Configuration dictionary containing OpenCTI and connector settings :type config: Dict :param playbook_compatible: Whether the connector can be used in playbooks :type playbook_compatible: bool .. rubric:: Example >>> config = { ... "opencti": {"url": "http://localhost:8080", "token": "xxx"}, ... "connector": {"id": "xxx", "name": "My Connector", "type": "EXTERNAL_IMPORT"} ... } >>> helper = OpenCTIConnectorHelper(config) >>> helper.listen(my_callback_function) .. attribute:: api OpenCTI API client for connector operations .. attribute:: api_impersonate API client that impersonates the request applicant .. attribute:: connector_logger Logger instance for connector messages .. attribute:: connector_info Runtime information about the connector .. attribute:: metric Prometheus metric handler Initialize the OpenCTIConnectorHelper. :param config: Configuration dictionary with OpenCTI and connector settings :type config: Dict :param playbook_compatible: Whether the connector can be used in playbooks :type playbook_compatible: bool .. py:class:: TimeUnit(*args, **kwds) Bases: :py:obj:`enum.Enum` Time unit enumeration for scheduling intervals (deprecated). Use ISO 8601 duration format with schedule_iso() instead. :cvar SECONDS: 1 second :cvar MINUTES: 60 seconds :cvar HOURS: 3600 seconds :cvar DAYS: 86400 seconds :cvar WEEKS: 604800 seconds :cvar YEARS: 31536000 seconds .. py:attribute:: SECONDS :value: 1 .. py:attribute:: MINUTES :value: 60 .. py:attribute:: HOURS :value: 3600 .. py:attribute:: DAYS :value: 86400 .. py:attribute:: WEEKS :value: 604800 .. py:attribute:: YEARS :value: 31536000 .. py:attribute:: stream_collections .. py:attribute:: config .. py:attribute:: opencti_url :value: None .. py:attribute:: opencti_token :value: None .. py:attribute:: opencti_custom_headers :value: None .. py:attribute:: opencti_ssl_verify :value: False .. py:attribute:: opencti_json_logging :value: True .. py:attribute:: connect_id :value: None .. py:attribute:: connect_auto_create_service_account :value: False .. py:attribute:: connect_auto_create_service_account_confidence_level :value: 50 .. py:attribute:: listen_protocol :value: '' .. py:attribute:: listen_protocol_api_port :value: 7070 .. py:attribute:: listen_protocol_api_path :value: '/api/callback' .. py:attribute:: listen_protocol_api_ssl :value: False .. py:attribute:: listen_protocol_api_uri :value: 'https://127.0.0.1:7070' .. py:attribute:: connect_type :value: None .. py:attribute:: connect_queue_threshold :value: 500 .. py:attribute:: connect_duration_period :value: None .. py:attribute:: connect_live_stream_id :value: None .. py:attribute:: connect_live_stream_listen_delete :value: True .. py:attribute:: connect_live_stream_no_dependencies :value: True .. py:attribute:: connect_live_stream_with_inferences :value: False .. py:attribute:: connect_live_stream_recover_iso_date :value: None .. py:attribute:: connect_live_stream_start_timestamp :value: None .. py:attribute:: connect_name :value: None .. py:attribute:: connect_confidence_level :value: None .. py:attribute:: connect_scope :value: 'not-applicable' .. py:attribute:: connect_auto :value: False .. py:attribute:: connect_auto_update :value: False .. py:attribute:: connect_enrichment_resolution :value: 'none' .. py:attribute:: bundle_send_to_queue :value: True .. py:attribute:: bundle_send_to_directory :value: False .. py:attribute:: bundle_send_to_directory_path :value: None .. py:attribute:: bundle_send_to_directory_retention :value: 7 .. py:attribute:: bundle_send_to_s3 :value: False .. py:attribute:: bundle_send_to_s3_bucket :value: None .. py:attribute:: bundle_send_to_s3_folder :value: 'connectors' .. py:attribute:: bundle_send_to_s3_retention :value: 7 .. py:attribute:: s3_endpoint :value: None .. py:attribute:: s3_port :value: None .. py:attribute:: s3_access_key :value: None .. py:attribute:: s3_secret_key :value: None .. py:attribute:: s3_use_ssl :value: None .. py:attribute:: s3_bucket_region :value: None .. py:attribute:: connect_only_contextual :value: False .. py:attribute:: connect_xtm_one_intent :value: None .. py:attribute:: log_level :value: '' .. py:attribute:: connect_run_and_terminate :value: False .. py:attribute:: connect_validate_before_import :value: False .. py:attribute:: scheduler .. py:attribute:: connector_info .. py:attribute:: api .. py:attribute:: api_impersonate .. py:attribute:: connector_logger .. py:attribute:: log_debug .. py:attribute:: log_info .. py:attribute:: log_warning .. py:attribute:: log_error .. py:attribute:: metric .. py:attribute:: connector .. py:attribute:: work_id :value: None .. py:attribute:: validation_mode :value: 'workbench' .. py:attribute:: force_validation :value: False .. py:attribute:: draft_id :value: None .. py:attribute:: playbook :value: None .. py:attribute:: enrichment_shared_organizations :value: None .. py:attribute:: connector_id .. py:attribute:: applicant_id .. py:attribute:: connector_state .. py:attribute:: connector_config .. py:attribute:: queue_protocol :value: None .. py:attribute:: listen_queue :value: None .. py:method:: stop() -> None Stop the connector and clean up resources. This method stops all running threads (listen queue, ping thread) and unregisters the connector from OpenCTI. .. py:method:: get_name() -> Optional[Union[bool, int, str]] Get the connector name. :return: The name of the connector :rtype: Optional[Union[bool, int, str]] .. py:method:: get_stream_collection() Get the stream collection configuration. :return: Stream collection configuration dictionary :rtype: dict :raises ValueError: If no stream is connected .. py:method:: get_only_contextual() -> Optional[Union[bool, int, str]] Get the only_contextual configuration value. :return: Whether the connector processes only contextual data :rtype: Optional[Union[bool, int, str]] .. py:method:: get_run_and_terminate() -> Optional[Union[bool, int, str]] Get the run_and_terminate configuration value. :return: Whether the connector should run once and terminate :rtype: Optional[Union[bool, int, str]] .. py:method:: get_validate_before_import() -> Optional[Union[bool, int, str]] Get the validate_before_import configuration value. :return: Whether to validate data before importing :rtype: Optional[Union[bool, int, str]] .. py:method:: set_state(state) -> None Set the connector state. Stores the connector state as a JSON string for persistence across runs. The state can be retrieved later using get_state(). :param state: State object to store, or None to clear the state :type state: Dict or None .. py:method:: get_state() -> Optional[Dict] Get the connector state. Retrieves the current connector state that was previously stored. The state is used to track progress and resume operations across runs. :return: The current state of the connector, or None if no state exists :rtype: Optional[Dict] .. py:method:: force_ping() Force a ping to the OpenCTI API to update connector state. This method manually triggers a ping to synchronize the connector state with the OpenCTI platform. .. py:method:: next_run_datetime(duration_period_in_seconds: Union[int, float]) -> None Calculate and set the next scheduled run datetime in ISO format. :param duration_period_in_seconds: Duration in seconds until next run :type duration_period_in_seconds: Union[int, float] .. py:method:: last_run_datetime() -> None Set the last run datetime to the current UTC time in ISO format. .. py:method:: check_connector_buffering() -> bool Check if the RabbitMQ queue has exceeded the allowed threshold. :return: True if queue size exceeds threshold, False otherwise :rtype: bool .. py:method:: schedule_unit(message_callback: Callable[[], None], duration_period: Union[int, float, str], time_unit: TimeUnit) -> None Schedule connector execution with a time unit (deprecated). This method manages backward compatibility of intervals on connectors. Use schedule_iso method instead. :param message_callback: The connector process callback function :type message_callback: Callable[[], None] :param duration_period: The connector interval value :type duration_period: Union[int, float, str] :param time_unit: The unit of time (YEARS, WEEKS, DAYS, HOURS, MINUTES, SECONDS) :type time_unit: TimeUnit .. py:method:: schedule_iso(message_callback: Callable[[], None], duration_period: str) -> None Schedule connector execution using ISO 8601 duration format. :param message_callback: The connector process callback function :type message_callback: Callable[[], None] :param duration_period: Duration in ISO 8601 format (e.g., "P18Y9W4DT11H9M8S") :type duration_period: str .. py:method:: schedule_process(message_callback: Callable[[], None], duration_period: Union[int, float]) -> None Schedule the execution of a connector process. If duration_period is zero or connect_run_and_terminate is True, the process will run once and terminate. Otherwise, it schedules the next run based on the interval. :param message_callback: The connector process callback function :type message_callback: Callable[[], None] :param duration_period: The connector's interval in seconds :type duration_period: Union[int, float] .. py:method:: listen(message_callback: Callable[[Dict], str]) -> None Listen for messages from the queue and process them via callback. Starts a listener thread that consumes messages from RabbitMQ or HTTP API (depending on configured listen protocol) and processes each message through the provided callback function. This method blocks until the listener is stopped. :param message_callback: Function to process incoming messages. Receives event data dict and should return a status message string. :type message_callback: Callable[[Dict], str] .. py:method:: listen_stream(message_callback: Callable, url: Optional[str] = None, token: Optional[str] = None, verify_ssl: Optional[bool] = None, start_timestamp: Optional[str] = None, live_stream_id: Optional[str] = None, listen_delete: Optional[bool] = None, no_dependencies: Optional[bool] = None, recover_iso_date: Optional[str] = None, with_inferences: Optional[bool] = None) -> ListenStream Start listening to an OpenCTI event stream. Connects to an SSE stream and processes events through the callback. Parameters default to connector configuration values if not specified. :param message_callback: Function to call for each stream event :type message_callback: Callable :param url: Base URL for stream (defaults to opencti_url) :type url: str or None :param token: Authentication token (defaults to opencti_token) :type token: str or None :param verify_ssl: Whether to verify SSL certificates :type verify_ssl: bool or None :param start_timestamp: Stream position to start from :type start_timestamp: str or None :param live_stream_id: Specific stream ID to connect to :type live_stream_id: str or None :param listen_delete: Whether to receive delete events :type listen_delete: bool or None :param no_dependencies: Whether to exclude dependencies :type no_dependencies: bool or None :param recover_iso_date: ISO date to recover events from :type recover_iso_date: str or None :param with_inferences: Whether to include inferred data :type with_inferences: bool or None :return: The started ListenStream thread :rtype: ListenStream .. py:method:: create_batch_callback(batch_callback: Callable, batch_size: Optional[int] = None, batch_timeout: Optional[float] = None, max_per_minute: Optional[int] = None) -> BatchCallbackWrapper Create a callback wrapper that batches messages. This factory method creates a BatchCallbackWrapper that can be used with listen_stream to enable batch processing of events. For rate limiting, use the max_per_minute parameter (recommended). Usage: # Basic batch processing: batch_callback = helper.create_batch_callback( process_batch_func, batch_size=100, batch_timeout=30 ) helper.listen_stream(message_callback=batch_callback) # With rate limiting (recommended): batch_callback = helper.create_batch_callback( process_batch_func, batch_size=100, batch_timeout=30, max_per_minute=10 ) helper.listen_stream(message_callback=batch_callback) The batch callback receives a dictionary with the following structure: { "events": [list of SSE messages], "batch_metadata": { "batch_size": int, "trigger_reason": str, # "size_limit", "timeout", "shutdown" "elapsed_time": float, "timestamp": float, } } :param batch_callback: Function to call with batched events :type batch_callback: Callable[[dict], None] :param batch_size: Process batch when this many events accumulated (optional) :type batch_size: int or None :param batch_timeout: Process batch after this many seconds (optional) :type batch_timeout: float or None :param max_per_minute: Maximum batch callbacks per minute (optional) :type max_per_minute: int or None :return: BatchCallbackWrapper instance for use with listen_stream :rtype: BatchCallbackWrapper :raises ValueError: If neither batch_size nor batch_timeout is specified :raises ValueError: If batch_size is not a positive integer :raises ValueError: If batch_timeout is not a positive number :raises ValueError: If max_per_minute is not a positive integer .. py:method:: create_rate_limiter(max_per_minute: int) -> RateLimiter Create a rate limiter that can wrap any callback. The rate limiter uses a sliding window algorithm to enforce a maximum number of calls per minute. It can be used with both batch and non-batch callbacks. Usage: # With batch callback: batch_callback = helper.create_batch_callback( process_batch_func, batch_size=100, batch_timeout=30 ) rate_limiter = helper.create_rate_limiter(max_per_minute=10) rate_limited = rate_limiter.wrap(batch_callback) helper.listen_stream(message_callback=rate_limited) # With non-batch callback: rate_limiter = helper.create_rate_limiter(max_per_minute=100) rate_limited = rate_limiter.wrap(process_message) helper.listen_stream(message_callback=rate_limited) :param max_per_minute: Maximum number of calls allowed per 60-second window :type max_per_minute: int :return: RateLimiter instance that can wrap callbacks :rtype: RateLimiter :raises ValueError: If max_per_minute is not a positive integer .. py:method:: get_opencti_url() -> Optional[Union[bool, int, str]] Get the OpenCTI URL. :return: The URL of the OpenCTI platform :rtype: Optional[Union[bool, int, str]] .. py:method:: get_opencti_token() -> Optional[Union[bool, int, str]] Get the OpenCTI API token. :return: The API token for OpenCTI authentication :rtype: Optional[Union[bool, int, str]] .. py:method:: get_connector() -> pycti.connector.opencti_connector.OpenCTIConnector Get the OpenCTIConnector instance. :return: The OpenCTIConnector instance :rtype: OpenCTIConnector .. py:method:: date_now() -> str Get the current UTC datetime in ISO 8601 format. Returns the current time with timezone offset notation (+00:00). :return: Current UTC datetime as ISO 8601 string (e.g., "2024-01-15T10:30:00+00:00") :rtype: str .. rubric:: Example >>> helper.date_now() '2024-01-15T10:30:00+00:00' .. py:method:: date_now_z() -> str Get the current UTC datetime in ISO 8601 format with Z suffix. Returns the current time with 'Z' suffix instead of '+00:00'. This format is commonly used in STIX objects. :return: Current UTC datetime as ISO 8601 string (e.g., "2024-01-15T10:30:00Z") :rtype: str .. rubric:: Example >>> helper.date_now_z() '2024-01-15T10:30:00Z' .. py:method:: send_stix2_bundle(bundle: str, **kwargs) -> list Send a STIX2 bundle to the OpenCTI platform. Processes and sends a STIX2 bundle to OpenCTI via the message queue or API. The bundle is split into smaller chunks and sent with proper sequencing. Supports validation workflows, draft mode, and directory export. :param bundle: Valid STIX2 bundle as a JSON string :type bundle: str :param work_id: Work ID for tracking the import job (default: self.work_id) :type work_id: str, optional :param validation_mode: Validation mode - "workbench" or "draft" (default: self.validation_mode) :type validation_mode: str, optional :param draft_id: Draft context ID to send the bundle to (default: self.draft_id) :type draft_id: str, optional :param entities_types: List of entity types to filter (default: None) :type entities_types: list, optional :param update: Whether to update existing data in the database (default: False) :type update: bool, optional :param event_version: Event version for the bundle (default: None) :type event_version: str, optional :param bypass_validation: Skip validation workflow (default: False) :type bypass_validation: bool, optional :param force_validation: Force validation even if not configured (default: self.force_validation) :type force_validation: bool, optional :param entity_id: Entity ID for context (default: None) :type entity_id: str, optional :param file_markings: File markings to apply (default: None) :type file_markings: list, optional :param file_name: File name for workbench upload (default: None) :type file_name: str, optional :param send_to_queue: Whether to send to message queue (default: self.bundle_send_to_queue) :type send_to_queue: bool, optional :param cleanup_inconsistent_bundle: Clean up inconsistent bundle data (default: False) :type cleanup_inconsistent_bundle: bool, optional :param send_to_directory: Whether to write bundle to directory (default: self.bundle_send_to_directory) :type send_to_directory: bool, optional :param send_to_directory_path: Directory path for bundle export (default: self.bundle_send_to_directory_path) :type send_to_directory_path: str, optional :param send_to_directory_retention: Days to retain exported files (default: self.bundle_send_to_directory_retention) :type send_to_directory_retention: int, optional :param send_to_s3: Whether to upload bundle to S3 (default: self.bundle_send_to_s3) :type send_to_s3: bool, optional :param no_split: Whether to send without splitting (default: False) :type no_split: bool, optional :return: List of processed bundle chunks :rtype: list :raises ValueError: If the bundle is empty or contains no valid objects .. py:method:: stix2_deduplicate_objects(items) -> list :staticmethod: Deduplicate STIX2 objects by their ID. Removes duplicate STIX2 objects from a list, keeping only the first occurrence of each unique ID. :param items: List of STIX2 objects to deduplicate :type items: list :return: Deduplicated list of STIX2 objects :rtype: list .. py:method:: stix2_create_bundle(items) -> Optional[str] :staticmethod: Create a STIX2 bundle from a list of objects. Wraps STIX2 objects in a valid bundle structure with a generated UUID. Automatically serializes objects if they are STIX2 library instances. :param items: List of STIX2 objects (dicts or STIX2 library objects) :type items: list :return: JSON string of the STIX2 bundle, or None if items is empty :rtype: Optional[str] .. py:method:: check_max_tlp(tlp: str, max_tlp: str) -> bool :staticmethod: Check if a TLP level is within the allowed maximum TLP level. Validates that the given TLP marking is at or below the maximum allowed TLP level. Useful for filtering data based on sharing restrictions. :param tlp: The TLP level to check (e.g., "TLP:GREEN", "TLP:AMBER") :type tlp: str :param max_tlp: The highest allowed TLP level for comparison :type max_tlp: str :return: True if the TLP level is within the allowed range, False otherwise :rtype: bool .. rubric:: Example >>> OpenCTIConnectorHelper.check_max_tlp("TLP:GREEN", "TLP:AMBER") True >>> OpenCTIConnectorHelper.check_max_tlp("TLP:RED", "TLP:GREEN") False .. py:method:: get_attribute_in_extension(key: str, stix_object: Dict) -> any :staticmethod: Get an attribute from OpenCTI STIX extensions. Retrieves a value from OpenCTI's custom STIX extension definitions. Checks both the primary OpenCTI extension and the SDO extension, falling back to the object's root attributes if not found in extensions. :param key: The attribute key to retrieve :type key: str :param stix_object: A STIX object dictionary :type stix_object: Dict :return: The attribute value, or None if not found :rtype: any .. rubric:: Example >>> obj = {"extensions": {"extension-definition--ea279b3e-...": {"score": 85}}} >>> OpenCTIConnectorHelper.get_attribute_in_extension("score", obj) 85 .. py:method:: get_attribute_in_mitre_extension(key: str, stix_object: Dict) -> any :staticmethod: Get an attribute from MITRE ATT&CK STIX extension. Retrieves a value from the MITRE ATT&CK custom STIX extension definition used for attack patterns and techniques. :param key: The attribute key to retrieve :type key: str :param stix_object: A STIX object dictionary :type stix_object: Dict :return: The attribute value, or None if not found :rtype: any .. rubric:: Example >>> obj = {"extensions": {"extension-definition--322b8f77-...": {"x_mitre_version": "1.0"}}} >>> OpenCTIConnectorHelper.get_attribute_in_mitre_extension("x_mitre_version", obj) '1.0' .. py:method:: get_data_from_enrichment(data, standard_id, opencti_entity) Extract STIX entity and objects from enrichment data. :param data: The enrichment data containing a bundle :type data: dict :param standard_id: The STIX standard ID of the entity :type standard_id: str :param opencti_entity: The OpenCTI entity object :type opencti_entity: dict :return: Dictionary containing stix_entity and stix_objects :rtype: dict