pycti.connector.opencti_connector_helper

OpenCTI Connector Helper module.

This module provides the main helper class and utilities for building OpenCTI connectors. It handles connector registration, message queue communication, stream listening, scheduling, and STIX2 bundle processing.

Key components:
  • OpenCTIConnectorHelper: Main class for connector development

  • ListenQueue: Handles RabbitMQ message consumption

  • ListenStream: Handles SSE stream consumption

  • PingAlive: Maintains connector heartbeat with the platform

  • ConnectorInfo: Stores connector runtime information

Example

>>> from pycti import OpenCTIConnectorHelper
>>> helper = OpenCTIConnectorHelper(config)
>>> helper.listen(callback_function)

Attributes

TRUTHY

List of string values considered as boolean True.

FALSY

List of string values considered as boolean False.

app

Classes

ListenQueue

Thread class for consuming messages from RabbitMQ or HTTP API.

PingAlive

Daemon thread that maintains connector heartbeat with OpenCTI platform.

StreamAlive

Watchdog thread that monitors SSE stream health via heartbeat messages.

RateLimiter

Rate limiter using sliding window algorithm.

RateLimitedCallback

A callback wrapper that applies rate limiting.

BatchCallbackWrapper

Wraps a batch callback to work with single-message listen_stream.

ListenStream

Thread class for consuming events from OpenCTI SSE stream.

ConnectorInfo

Container for connector runtime information and status.

OpenCTIConnectorHelper

Main helper class for developing OpenCTI connectors.

Functions

killProgramHook(→ None)

Exception hook to terminate the program on unhandled exceptions.

start_loop(→ None)

Start an asyncio event loop and run it forever.

get_config_variable(→ Union[bool, int, None, str])

Retrieve a configuration variable from environment or YAML config.

normalize_email_prefix(→ str)

Normalize the local part of an email address by replacing invalid characters.

is_memory_certificate(→ bool)

Check if a certificate is provided as a PEM string in memory.

ssl_verify_locations(→ None)

Load CA certificate verification locations into an SSL context.

data_to_temp_file(→ str)

Write data to a temporary file securely.

ssl_cert_chain(→ None)

Load a certificate chain and private key into an SSL context.

create_callback_ssl_context(→ ssl.SSLContext)

Create an SSL context for the API callback server.

create_mq_ssl_context(→ ssl.SSLContext)

Create an SSL context for RabbitMQ message queue connections.

Module Contents

pycti.connector.opencti_connector_helper.TRUTHY: List[str] = ['yes', 'true', 'True'][source]

List of string values considered as boolean True.

pycti.connector.opencti_connector_helper.FALSY: List[str] = ['no', 'false', 'False'][source]

List of string values considered as boolean False.

pycti.connector.opencti_connector_helper.app[source]
pycti.connector.opencti_connector_helper.killProgramHook(etype, value, tb) None[source]

Exception hook to terminate the program on unhandled exceptions.

This function is used as a system exception hook to ensure the program terminates cleanly when an unhandled exception occurs, particularly useful for background threads.

Parameters:
  • etype (type) – The exception type (class)

  • value (BaseException) – The exception instance

  • tb (types.TracebackType) – The traceback object

pycti.connector.opencti_connector_helper.start_loop(loop) None[source]

Start an asyncio event loop and run it forever.

Sets the given event loop as the current loop for the thread and runs it indefinitely until stopped.

Parameters:

loop (asyncio.AbstractEventLoop) – The asyncio event loop to start

pycti.connector.opencti_connector_helper.get_config_variable(env_var: str, yaml_path: List, config: Dict | None = None, isNumber: bool | None = False, default=None, required=False) bool | int | None | str[source]

Retrieve a configuration variable from environment or YAML config.

Looks up configuration values with the following precedence: 1. Environment variable (highest priority) 2. YAML configuration file 3. Default value (lowest priority)

Boolean string values (“yes”, “true”, “True”, “no”, “false”, “False”) are automatically converted to Python bool.

Parameters:
  • env_var (str) – Name of the environment variable to check

  • yaml_path (List[str]) – Two-element list specifying [section, key] in YAML config

  • config (Dict) – Configuration dictionary loaded from YAML file

  • isNumber (bool) – If True, convert the value to integer

  • default (any) – Default value if not found in env or config

  • required (bool) – If True and no value found, raise ValueError

Returns:

The configuration value as bool, int, str, or None

Return type:

Union[bool, int, None, str]

Raises:

ValueError – If required=True and no value is found

Example

>>> get_config_variable("OPENCTI_URL", ["opencti", "url"], config)
'http://localhost:8080'
>>> get_config_variable("CONNECTOR_LOG_LEVEL", ["connector", "log_level"],
...                     config, default="INFO")
'INFO'
pycti.connector.opencti_connector_helper.normalize_email_prefix(email: str) str[source]

Normalize the local part of an email address by replacing invalid characters.

Replaces any characters not valid in email prefixes with hyphens. Valid characters include: a-z, A-Z, 0-9, and special chars: . _ + - Consecutive hyphens are collapsed and leading/trailing hyphens are removed.

Parameters:

email (str) – Email address to normalize

Returns:

Normalized email address with valid local part

Return type:

str

Raises:

ValueError – If the email address does not contain an ‘@’ symbol

Example

>>> normalize_email_prefix("john.doe@example.com")
'john.doe@example.com'
>>> normalize_email_prefix("john@doe@example.com")
'john-doe@example.com'
>>> normalize_email_prefix("user!name@domain.com")
'user-name@domain.com'
pycti.connector.opencti_connector_helper.is_memory_certificate(certificate: str) bool[source]

Check if a certificate is provided as a PEM string in memory.

Determines whether the certificate data is an in-memory PEM-formatted string (starting with “—–BEGIN”) rather than a file path.

Parameters:

certificate (str) – The certificate data to check (PEM string or file path)

Returns:

True if the certificate is a PEM string, False if it’s a file path

Return type:

bool

Example

>>> is_memory_certificate("-----BEGIN CERTIFICATE-----\n...")
True
>>> is_memory_certificate("/path/to/cert.pem")
False
pycti.connector.opencti_connector_helper.ssl_verify_locations(ssl_context: ssl.SSLContext, certdata: str | None) None[source]

Load CA certificate verification locations into an SSL context.

Configures the SSL context with certificate authority (CA) certificates for verifying peer certificates. Supports both file paths and in-memory PEM-formatted certificates.

Parameters:
  • ssl_context (ssl.SSLContext) – The SSL context to configure

  • certdata (str or None) – CA certificate data as file path or PEM string, or None to skip

Example

>>> ssl_ctx = ssl.create_default_context()
>>> ssl_verify_locations(ssl_ctx, "/path/to/ca-bundle.crt")
>>> ssl_verify_locations(ssl_ctx, "-----BEGIN CERTIFICATE-----\n...")
pycti.connector.opencti_connector_helper.data_to_temp_file(data: str) str[source]

Write data to a temporary file securely.

Creates a temporary file with secure permissions (readable and writable only by the creating user). The file descriptor is not inherited by child processes.

Note

The caller is responsible for deleting the temporary file when it is no longer needed.

Parameters:

data (str) – The string data to write to the temporary file

Returns:

Absolute path to the created temporary file

Return type:

str

Example

>>> path = data_to_temp_file("-----BEGIN PRIVATE KEY-----\n...")
>>> # Use the file...
>>> os.unlink(path)  # Clean up when done
pycti.connector.opencti_connector_helper.ssl_cert_chain(ssl_context: ssl.SSLContext, cert_data: str | None, key_data: str | None, passphrase: str | None) None[source]

Load a certificate chain and private key into an SSL context.

Configures the SSL context with a client certificate and private key for mutual TLS authentication. Supports both file paths and in-memory PEM-formatted certificates/keys. Temporary files are created and cleaned up automatically when using in-memory data.

Parameters:
  • ssl_context (ssl.SSLContext) – The SSL context to configure

  • cert_data (str or None) – Certificate data as file path or PEM string, or None to skip

  • key_data (str or None) – Private key data as file path or PEM string, or None

  • passphrase (str or None) – Passphrase for encrypted private key, or None if unencrypted

Example

>>> ssl_ctx = ssl.create_default_context()
>>> ssl_cert_chain(ssl_ctx, "/path/to/cert.pem", "/path/to/key.pem", None)
pycti.connector.opencti_connector_helper.create_callback_ssl_context(config: Dict) ssl.SSLContext[source]

Create an SSL context for the API callback server.

Creates and configures an SSL context suitable for the HTTPS callback server used in API listen protocol mode. Loads certificate chain from configuration.

Configuration keys used:
  • LISTEN_PROTOCOL_API_SSL_KEY: Path or PEM string for SSL private key

  • LISTEN_PROTOCOL_API_SSL_CERT: Path or PEM string for SSL certificate

  • LISTEN_PROTOCOL_API_SSL_PASSPHRASE: Optional passphrase for private key

Parameters:

config (Dict) – Configuration dictionary containing SSL settings

Returns:

Configured SSL context for client authentication

Return type:

ssl.SSLContext

pycti.connector.opencti_connector_helper.create_mq_ssl_context(config: Dict) ssl.SSLContext[source]

Create an SSL context for RabbitMQ message queue connections.

Creates and configures an SSL context for secure connections to RabbitMQ. Supports CA verification, client certificates, and optional certificate verification bypass.

Configuration keys used:
  • MQ_USE_SSL_CA: CA certificate for server verification

  • MQ_USE_SSL_CERT: Client certificate for mutual TLS

  • MQ_USE_SSL_KEY: Client private key for mutual TLS

  • MQ_USE_SSL_REJECT_UNAUTHORIZED: Whether to verify server certificate

  • MQ_USE_SSL_PASSPHRASE: Optional passphrase for private key

Parameters:

config (Dict) – Configuration dictionary containing MQ SSL settings

Returns:

Configured SSL context for RabbitMQ connections

Return type:

ssl.SSLContext

class pycti.connector.opencti_connector_helper.ListenQueue(helper, opencti_token: str, config: Dict, connector_config: Dict, applicant_id: str, listen_protocol: str, listen_protocol_api_ssl: bool, listen_protocol_api_path: str, listen_protocol_api_port: int, callback: Callable[[Dict], str])[source]

Bases: threading.Thread

Thread class for consuming messages from RabbitMQ or HTTP API.

Handles message consumption from either RabbitMQ (AMQP protocol) or an HTTP API endpoint, depending on the configured listen protocol. Messages are processed through a callback function provided at initialization.

This class supports two listen protocols:
  • AMQP: Connects to RabbitMQ and consumes messages from a queue

  • API: Starts an HTTP server and receives messages via POST requests

Parameters:
  • helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance

  • opencti_token (str) – Authentication token for OpenCTI API

  • config (Dict) – Global configuration dictionary

  • connector_config (Dict) – Connector-specific configuration from registration

  • applicant_id (str) – ID of the user/connector making requests

  • listen_protocol (str) – Protocol to use (“AMQP” or “API”)

  • listen_protocol_api_ssl (bool) – Whether to use SSL for API protocol

  • listen_protocol_api_path (str) – URL path for API endpoint

  • listen_protocol_api_port (int) – Port for API server

  • callback (Callable[[Dict], str]) – Function to call when processing messages

Initialize the ListenQueue thread.

Parameters:
  • helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance

  • opencti_token (str) – Authentication token for OpenCTI API

  • config (Dict) – Global configuration dictionary

  • connector_config (Dict) – Connector configuration from registration

  • applicant_id (str) – ID of the user/connector making requests

  • listen_protocol (str) – Protocol to use (“AMQP” or “API”)

  • listen_protocol_api_ssl (bool) – Whether to use SSL for API protocol

  • listen_protocol_api_path (str) – URL path for API endpoint

  • listen_protocol_api_port (int) – Port for API server

  • callback (Callable[[Dict], str]) – Function to process received messages

pika_credentials = None[source]
pika_parameters = None[source]
pika_connection = None[source]
channel = None[source]
helper[source]
callback[source]
config[source]
opencti_token[source]
listen_protocol[source]
listen_protocol_api_ssl[source]
listen_protocol_api_path[source]
listen_protocol_api_port[source]
connector_applicant_id[source]
host[source]
vhost[source]
use_ssl[source]
port[source]
user[source]
password[source]
connector_jwks[source]
queue_name[source]
exit_event[source]
thread = None[source]
is_token_valid(token)[source]

Returns True if the signature is valid and token is not expired. Returns False otherwise.

run() None[source]

Execute the message listening thread.

Starts the appropriate listener based on the configured protocol: - AMQP: Connects to RabbitMQ and consumes messages from the queue - API: Starts a FastAPI/Uvicorn HTTP server to receive messages

The thread runs until stopped via the stop() method or an error occurs.

Raises:

ValueError – If an unsupported listen protocol is configured

stop()[source]

Stop the ListenQueue thread and close connections.

This method sets the exit event, closes the RabbitMQ connection, and waits for the processing thread to complete.

class pycti.connector.opencti_connector_helper.PingAlive(connector_logger, connector_id: str, api, get_state: Callable[[], Dict | None], set_state: Callable[[str], None], metric, connector_info)[source]

Bases: threading.Thread

Daemon thread that maintains connector heartbeat with OpenCTI platform.

Periodically pings the OpenCTI API to indicate the connector is alive and synchronizes connector state between local and remote instances.

Parameters:
  • connector_logger (logging.Logger) – Logger instance for the connector

  • connector_id (str) – Unique identifier of the connector

  • api (OpenCTIApiClient) – OpenCTI API client instance

  • get_state (Callable[[], Optional[Dict]]) – Function to retrieve current connector state

  • set_state (Callable[[str], None]) – Function to update connector state

  • metric (OpenCTIMetricHandler) – Metric handler for recording ping statistics

  • connector_info (ConnectorInfo) – ConnectorInfo instance with runtime details

Initialize the PingAlive daemon thread.

Parameters:
  • connector_logger (logging.Logger) – Logger instance for the connector

  • connector_id (str) – Unique identifier of the connector

  • api (OpenCTIApiClient) – OpenCTI API client instance

  • get_state (Callable[[], Optional[Dict]]) – Function to retrieve current connector state

  • set_state (Callable[[str], None]) – Function to update connector state

  • metric (OpenCTIMetricHandler) – Metric handler for recording ping statistics

  • connector_info (ConnectorInfo) – ConnectorInfo instance with runtime details

connector_logger[source]
connector_id[source]
in_error = False[source]
api[source]
get_state[source]
set_state[source]
exit_event[source]
metric[source]
connector_info[source]
ping() None[source]

Execute the ping loop to maintain connector heartbeat.

Continuously pings the OpenCTI API every 40 seconds to: - Signal that the connector is alive - Send current connector state and info - Receive and apply any remote state updates

If the remote state differs from local state, the local state is updated to match. This allows state resets from the UI.

The loop continues until the exit_event is set.

run() None[source]

Start the PingAlive thread execution.

Entry point for the thread that initiates the ping loop.

stop() None[source]

Stop the PingAlive thread gracefully.

Sets the exit event to signal the ping loop to terminate.

class pycti.connector.opencti_connector_helper.StreamAlive(helper, q: queue.Queue)[source]

Bases: threading.Thread

Watchdog thread that monitors SSE stream health via heartbeat messages.

Monitors a queue for heartbeat signals from the stream listener. If no heartbeat is received within 45 seconds, the connector is stopped to allow for reconnection.

Parameters:
  • helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance

  • q (Queue) – Queue for receiving heartbeat signals from stream listener

Initialize the StreamAlive watchdog thread.

Parameters:
  • helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance

  • q (Queue) – Queue for receiving heartbeat signals

helper[source]
q[source]
exit_event[source]
run() None[source]

Execute the stream health monitoring loop.

Checks every 5 seconds for heartbeat signals in the queue. If no signal is received for 45 seconds, terminates the connector to trigger a reconnection.

stop() None[source]

Stop the StreamAlive watchdog thread gracefully.

Sets the exit event to signal the monitoring loop to terminate.

class pycti.connector.opencti_connector_helper.RateLimiter(helper, max_per_minute: int)[source]

Rate limiter using sliding window algorithm.

Usage:

# For batch mode (alternative to using max_per_minute in create_batch_callback): batch_callback = helper.create_batch_callback(

process_batch_func, batch_size=100, batch_timeout=30

) rate_limiter = helper.create_rate_limiter(max_per_minute=10) rate_limited_callback = rate_limiter.wrap(batch_callback) helper.listen_stream(message_callback=rate_limited_callback)

# For non-batch mode: rate_limiter = helper.create_rate_limiter(max_per_minute=100) rate_limited_callback = rate_limiter.wrap(process_message) helper.listen_stream(message_callback=rate_limited_callback)

Initialize the rate limiter.

Parameters:
  • helper – OpenCTIConnectorHelper instance for logging

  • max_per_minute – Maximum number of calls allowed per minute

helper[source]
max_per_minute[source]
timestamps: collections.deque[source]
set_heartbeat_queue(q: queue.Queue) None[source]

Set the heartbeat queue for sending keepalive signals during rate limiting.

Parameters:

q – Queue used by StreamAlive for heartbeat monitoring

stop() None[source]

Signal the rate limiter to stop waiting.

wait_if_needed() float[source]

Wait if rate limit is exceeded. Thread-safe.

Uses a sliding window algorithm to enforce max_per_minute. Sleeps if necessary to stay within the limit.

Returns:

Time spent waiting (seconds), 0 if no wait needed

wrap(callback: Callable) RateLimitedCallback[source]

Wrap a callback with rate limiting.

Parameters:

callback – The callback to wrap (can be a function or BatchCallbackWrapper)

Returns:

A callable that applies rate limiting before calling the original callback

class pycti.connector.opencti_connector_helper.RateLimitedCallback(rate_limiter: RateLimiter, callback: Callable)[source]

A callback wrapper that applies rate limiting.

This class wraps any callable and applies rate limiting before each call. It preserves the stop() and set_heartbeat_queue() methods from the wrapped callback if they exist.

Initialize the rate-limited callback.

Parameters:
  • rate_limiter – The RateLimiter instance to use

  • callback – The callback to wrap

__call__(*args, **kwargs)[source]

Call the wrapped callback with rate limiting.

stop() None[source]

Stop both the rate limiter and the wrapped callback.

set_heartbeat_queue(q: queue.Queue) None[source]

Set heartbeat queue on both rate limiter and wrapped callback.

property manages_state: bool[source]

Propagate manages_state from wrapped callback.

class pycti.connector.opencti_connector_helper.BatchCallbackWrapper(helper, batch_callback: Callable, batch_size: int | None = None, batch_timeout: float | None = None)[source]

Wraps a batch callback to work with single-message listen_stream.

This class accumulates individual messages and processes them in batches based on batch_size or batch_timeout conditions. It can be used as a callback with the listen_stream method to enable batch processing.

A dedicated timer thread handles all batch processing to avoid race conditions between size-triggered and timeout-triggered batches.

For rate limiting, use the max_per_minute parameter in create_batch_callback().

Usage:

# Basic batch processing: batch_callback = helper.create_batch_callback(

process_batch_func, batch_size=100, batch_timeout=30

) helper.listen_stream(message_callback=batch_callback)

# With rate limiting (recommended): batch_callback = helper.create_batch_callback(

process_batch_func, batch_size=100, batch_timeout=30, max_per_minute=10

) helper.listen_stream(message_callback=batch_callback)

Initialize the batch callback wrapper.

Parameters:
  • helper – OpenCTIConnectorHelper instance

  • batch_callback – Function to call with batched events

  • batch_size – Process batch when this many events accumulated

  • batch_timeout – Process batch after this many seconds

manages_state = True[source]
helper[source]
batch_callback[source]
batch_size = None[source]
batch_timeout = None[source]
batch: List = [][source]
batch_start_time: float | None = None[source]
__call__(msg) None[source]

Accumulate a message into the current batch.

This method only accumulates messages; batch processing is handled by the dedicated timer thread to avoid race conditions.

Parameters:

msg – SSE message object from the stream

stop() None[source]

Stop the batch wrapper and process remaining messages.

This method signals the timer thread to stop and waits for it to finish processing any remaining messages in the batch.

set_heartbeat_queue(q: queue.Queue) None[source]

Set the heartbeat queue for sending keepalive signals during batch processing.

Parameters:

q – Queue used by StreamAlive for heartbeat monitoring

class pycti.connector.opencti_connector_helper.ListenStream(helper, callback: Callable, url: str, token: str, verify_ssl: bool, start_timestamp: str | None, live_stream_id: str | None, listen_delete: bool, no_dependencies: bool, recover_iso_date: str | None, with_inferences: bool)[source]

Bases: threading.Thread

Thread class for consuming events from OpenCTI SSE stream.

Connects to an OpenCTI event stream and processes events through a callback function. Supports recovery from a specific point in time and various filtering options.

Parameters:
  • helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance

  • callback (Callable) – Function to call for each stream event

  • url (str) – Base URL for the stream endpoint

  • token (str) – Authentication token for the stream

  • verify_ssl (bool) – Whether to verify SSL certificates

  • start_timestamp (str or None) – Timestamp to start reading from (format: “timestamp-0”)

  • live_stream_id (str or None) – ID of the specific stream to connect to

  • listen_delete (bool) – Whether to receive delete events

  • no_dependencies (bool) – Whether to exclude dependency objects

  • recover_iso_date (str or None) – ISO date to recover events from

  • with_inferences (bool) – Whether to include inferred relationships

Initialize the ListenStream thread.

Parameters:
  • helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance

  • callback (Callable) – Function to process stream events

  • url (str) – Base URL for the stream endpoint

  • token (str) – Authentication token

  • verify_ssl (bool) – Whether to verify SSL certificates

  • start_timestamp (str or None) – Starting timestamp for stream position

  • live_stream_id (str or None) – Specific stream ID to connect to

  • listen_delete (bool) – Whether to receive delete events

  • no_dependencies (bool) – Whether to exclude dependencies

  • recover_iso_date (str or None) – ISO date for event recovery

  • with_inferences (bool) – Whether to include inferences

helper[source]
callback[source]
url[source]
token[source]
verify_ssl[source]
start_timestamp[source]
live_stream_id[source]
listen_delete[source]
no_dependencies[source]
recover_iso_date[source]
with_inferences[source]
exit_event[source]
run() None[source]

Execute the stream listening loop.

Connects to the OpenCTI SSE stream and processes events: - Initializes or restores stream position from connector state - Starts a StreamAlive watchdog for health monitoring - Processes heartbeat, connected, and data events - Updates connector state with latest event ID - Calls the callback function for each data event

The loop continues until stopped or an error occurs.

stop()[source]

Stop the ListenStream thread.

This method sets the exit event to signal the stream listening thread to stop. If the callback has a stop method (e.g., BatchCallbackWrapper or RateLimitedCallback), it will be called to ensure proper cleanup.

class pycti.connector.opencti_connector_helper.ConnectorInfo(run_and_terminate: bool = False, buffering: bool = False, queue_threshold: float = 500.0, queue_messages_size: float = 0.0, next_run_datetime: datetime = None, last_run_datetime: datetime = None)[source]

Container for connector runtime information and status.

Stores runtime metrics and status information about the connector, which is sent to the OpenCTI platform during ping operations.

Parameters:
  • run_and_terminate (bool) – Whether connector runs once and terminates

  • buffering (bool) – Whether connector is currently buffering due to queue limits

  • queue_threshold (float) – Maximum allowed queue size in MB before buffering

  • queue_messages_size (float) – Current size of queued messages in MB

  • next_run_datetime (datetime or None) – Scheduled datetime for next connector run

  • last_run_datetime (datetime or None) – Datetime of the last connector run

Example

>>> info = ConnectorInfo(run_and_terminate=False, queue_threshold=500.0)
>>> info.buffering = True
>>> info.queue_messages_size = 450.0
>>> details = info.all_details

Initialize ConnectorInfo with runtime parameters.

Parameters:
  • run_and_terminate (bool) – Whether connector runs once and terminates

  • buffering (bool) – Whether connector is buffering

  • queue_threshold (float) – Maximum queue size in MB

  • queue_messages_size (float) – Current queue size in MB

  • next_run_datetime (datetime or None) – Next scheduled run time

  • last_run_datetime (datetime or None) – Last run time

property all_details[source]

Get all connector information details as a dictionary.

Returns:

Dictionary containing all connector status information

Return type:

dict

property run_and_terminate: bool[source]

Get the run_and_terminate flag.

Returns:

Whether the connector runs once and terminates

Return type:

bool

property buffering: bool[source]

Get the buffering status.

Returns:

Whether the connector is currently buffering

Return type:

bool

property queue_threshold: float[source]

Get the queue threshold value.

Returns:

Maximum allowed queue size in MB

Return type:

float

property queue_messages_size: float[source]

Get the current queue messages size.

Returns:

Current size of queued messages in MB

Return type:

float

property next_run_datetime: datetime[source]

Get the next scheduled run datetime.

Returns:

Datetime for the next scheduled run, or None if not scheduled

Return type:

datetime or None

property last_run_datetime: datetime[source]

Get the last run datetime.

Returns:

Datetime of the last connector run, or None if never run

Return type:

datetime or None

class pycti.connector.opencti_connector_helper.OpenCTIConnectorHelper(config: Dict, playbook_compatible: bool = False)[source]

Main helper class for developing OpenCTI connectors.

Provides a comprehensive API for connector development, handling: - Connector registration and configuration - Message queue communication (RabbitMQ/API) - SSE stream consumption - STIX2 bundle creation and submission - Scheduling and lifecycle management - Metrics and logging

Parameters:
  • config (Dict) – Configuration dictionary containing OpenCTI and connector settings

  • playbook_compatible (bool) – Whether the connector can be used in playbooks

Example

>>> config = {
...     "opencti": {"url": "http://localhost:8080", "token": "xxx"},
...     "connector": {"id": "xxx", "name": "My Connector", "type": "EXTERNAL_IMPORT"}
... }
>>> helper = OpenCTIConnectorHelper(config)
>>> helper.listen(my_callback_function)
api[source]

OpenCTI API client for connector operations

api_impersonate[source]

API client that impersonates the request applicant

connector_logger[source]

Logger instance for connector messages

connector_info[source]

Runtime information about the connector

metric[source]

Prometheus metric handler

Initialize the OpenCTIConnectorHelper.

Parameters:
  • config (Dict) – Configuration dictionary with OpenCTI and connector settings

  • playbook_compatible (bool) – Whether the connector can be used in playbooks

class TimeUnit(*args, **kwds)[source]

Bases: enum.Enum

Time unit enumeration for scheduling intervals (deprecated).

Use ISO 8601 duration format with schedule_iso() instead.

Variables:
  • SECONDS – 1 second

  • MINUTES – 60 seconds

  • HOURS – 3600 seconds

  • DAYS – 86400 seconds

  • WEEKS – 604800 seconds

  • YEARS – 31536000 seconds

SECONDS = 1[source]
MINUTES = 60[source]
HOURS = 3600[source]
DAYS = 86400[source]
WEEKS = 604800[source]
YEARS = 31536000[source]
stream_collections[source]
config[source]
opencti_url = None[source]
opencti_token = None[source]
opencti_custom_headers = None[source]
opencti_ssl_verify = False[source]
opencti_json_logging = True[source]
connect_id = None[source]
connect_auto_create_service_account = False[source]
connect_auto_create_service_account_confidence_level = 50[source]
listen_protocol = ''[source]
listen_protocol_api_port = 7070[source]
listen_protocol_api_path = '/api/callback'[source]
listen_protocol_api_ssl = False[source]
listen_protocol_api_uri = 'https://127.0.0.1:7070'[source]
connect_type = None[source]
connect_queue_threshold = 500[source]
connect_duration_period = None[source]
connect_live_stream_id = None[source]
connect_live_stream_listen_delete = True[source]
connect_live_stream_no_dependencies = True[source]
connect_live_stream_with_inferences = False[source]
connect_live_stream_recover_iso_date = None[source]
connect_live_stream_start_timestamp = None[source]
connect_name = None[source]
connect_confidence_level = None[source]
connect_scope = 'not-applicable'[source]
connect_auto = False[source]
connect_auto_update = False[source]
connect_enrichment_resolution = 'none'[source]
bundle_send_to_queue = True[source]
bundle_send_to_directory = False[source]
bundle_send_to_directory_path = None[source]
bundle_send_to_directory_retention = 7[source]
bundle_send_to_s3 = False[source]
bundle_send_to_s3_bucket = None[source]
bundle_send_to_s3_folder = 'connectors'[source]
bundle_send_to_s3_retention = 7[source]
s3_endpoint = None[source]
s3_port = None[source]
s3_access_key = None[source]
s3_secret_key = None[source]
s3_use_ssl = None[source]
s3_bucket_region = None[source]
connect_only_contextual = False[source]
connect_xtm_one_intent = None[source]
log_level = ''[source]
connect_run_and_terminate = False[source]
connect_validate_before_import = False[source]
scheduler[source]
connector_info[source]
api[source]
api_impersonate[source]
connector_logger[source]
log_debug[source]
log_info[source]
log_warning[source]
log_error[source]
metric[source]
connector[source]
work_id = None[source]
validation_mode = 'workbench'[source]
force_validation = False[source]
draft_id = None[source]
playbook = None[source]
enrichment_shared_organizations = None[source]
connector_id[source]
applicant_id[source]
connector_state[source]
connector_config[source]
queue_protocol = None[source]
listen_queue = None[source]
stop() None[source]

Stop the connector and clean up resources.

This method stops all running threads (listen queue, ping thread) and unregisters the connector from OpenCTI.

get_name() bool | int | str | None[source]

Get the connector name.

Returns:

The name of the connector

Return type:

Optional[Union[bool, int, str]]

get_stream_collection()[source]

Get the stream collection configuration.

Returns:

Stream collection configuration dictionary

Return type:

dict

Raises:

ValueError – If no stream is connected

get_only_contextual() bool | int | str | None[source]

Get the only_contextual configuration value.

Returns:

Whether the connector processes only contextual data

Return type:

Optional[Union[bool, int, str]]

get_run_and_terminate() bool | int | str | None[source]

Get the run_and_terminate configuration value.

Returns:

Whether the connector should run once and terminate

Return type:

Optional[Union[bool, int, str]]

get_validate_before_import() bool | int | str | None[source]

Get the validate_before_import configuration value.

Returns:

Whether to validate data before importing

Return type:

Optional[Union[bool, int, str]]

set_state(state) None[source]

Set the connector state.

Stores the connector state as a JSON string for persistence across runs. The state can be retrieved later using get_state().

Parameters:

state (Dict or None) – State object to store, or None to clear the state

get_state() Dict | None[source]

Get the connector state.

Retrieves the current connector state that was previously stored. The state is used to track progress and resume operations across runs.

Returns:

The current state of the connector, or None if no state exists

Return type:

Optional[Dict]

force_ping()[source]

Force a ping to the OpenCTI API to update connector state.

This method manually triggers a ping to synchronize the connector state with the OpenCTI platform.

next_run_datetime(duration_period_in_seconds: int | float) None[source]

Calculate and set the next scheduled run datetime in ISO format.

Parameters:

duration_period_in_seconds (Union[int, float]) – Duration in seconds until next run

last_run_datetime() None[source]

Set the last run datetime to the current UTC time in ISO format.

check_connector_buffering() bool[source]

Check if the RabbitMQ queue has exceeded the allowed threshold.

Returns:

True if queue size exceeds threshold, False otherwise

Return type:

bool

schedule_unit(message_callback: Callable[[], None], duration_period: int | float | str, time_unit: TimeUnit) None[source]

Schedule connector execution with a time unit (deprecated).

This method manages backward compatibility of intervals on connectors. Use schedule_iso method instead.

Parameters:
  • message_callback (Callable[[], None]) – The connector process callback function

  • duration_period (Union[int, float, str]) – The connector interval value

  • time_unit (TimeUnit) – The unit of time (YEARS, WEEKS, DAYS, HOURS, MINUTES, SECONDS)

schedule_iso(message_callback: Callable[[], None], duration_period: str) None[source]

Schedule connector execution using ISO 8601 duration format.

Parameters:
  • message_callback (Callable[[], None]) – The connector process callback function

  • duration_period (str) – Duration in ISO 8601 format (e.g., “P18Y9W4DT11H9M8S”)

schedule_process(message_callback: Callable[[], None], duration_period: int | float) None[source]

Schedule the execution of a connector process.

If duration_period is zero or connect_run_and_terminate is True, the process will run once and terminate. Otherwise, it schedules the next run based on the interval.

Parameters:
  • message_callback (Callable[[], None]) – The connector process callback function

  • duration_period (Union[int, float]) – The connector’s interval in seconds

listen(message_callback: Callable[[Dict], str]) None[source]

Listen for messages from the queue and process them via callback.

Starts a listener thread that consumes messages from RabbitMQ or HTTP API (depending on configured listen protocol) and processes each message through the provided callback function. This method blocks until the listener is stopped.

Parameters:

message_callback (Callable[[Dict], str]) – Function to process incoming messages. Receives event data dict and should return a status message string.

listen_stream(message_callback: Callable, url: str | None = None, token: str | None = None, verify_ssl: bool | None = None, start_timestamp: str | None = None, live_stream_id: str | None = None, listen_delete: bool | None = None, no_dependencies: bool | None = None, recover_iso_date: str | None = None, with_inferences: bool | None = None) ListenStream[source]

Start listening to an OpenCTI event stream.

Connects to an SSE stream and processes events through the callback. Parameters default to connector configuration values if not specified.

Parameters:
  • message_callback (Callable) – Function to call for each stream event

  • url (str or None) – Base URL for stream (defaults to opencti_url)

  • token (str or None) – Authentication token (defaults to opencti_token)

  • verify_ssl (bool or None) – Whether to verify SSL certificates

  • start_timestamp (str or None) – Stream position to start from

  • live_stream_id (str or None) – Specific stream ID to connect to

  • listen_delete (bool or None) – Whether to receive delete events

  • no_dependencies (bool or None) – Whether to exclude dependencies

  • recover_iso_date (str or None) – ISO date to recover events from

  • with_inferences (bool or None) – Whether to include inferred data

Returns:

The started ListenStream thread

Return type:

ListenStream

create_batch_callback(batch_callback: Callable, batch_size: int | None = None, batch_timeout: float | None = None, max_per_minute: int | None = None) BatchCallbackWrapper[source]

Create a callback wrapper that batches messages.

This factory method creates a BatchCallbackWrapper that can be used with listen_stream to enable batch processing of events.

For rate limiting, use the max_per_minute parameter (recommended).

Usage:

# Basic batch processing: batch_callback = helper.create_batch_callback(

process_batch_func, batch_size=100, batch_timeout=30

) helper.listen_stream(message_callback=batch_callback)

# With rate limiting (recommended): batch_callback = helper.create_batch_callback(

process_batch_func, batch_size=100, batch_timeout=30, max_per_minute=10

) helper.listen_stream(message_callback=batch_callback)

The batch callback receives a dictionary with the following structure:
{

“events”: [list of SSE messages], “batch_metadata”: {

“batch_size”: int, “trigger_reason”: str, # “size_limit”, “timeout”, “shutdown” “elapsed_time”: float, “timestamp”: float,

}

}

Parameters:
  • batch_callback (Callable[[dict], None]) – Function to call with batched events

  • batch_size (int or None) – Process batch when this many events accumulated (optional)

  • batch_timeout (float or None) – Process batch after this many seconds (optional)

  • max_per_minute (int or None) – Maximum batch callbacks per minute (optional)

Returns:

BatchCallbackWrapper instance for use with listen_stream

Return type:

BatchCallbackWrapper

Raises:
  • ValueError – If neither batch_size nor batch_timeout is specified

  • ValueError – If batch_size is not a positive integer

  • ValueError – If batch_timeout is not a positive number

  • ValueError – If max_per_minute is not a positive integer

create_rate_limiter(max_per_minute: int) RateLimiter[source]

Create a rate limiter that can wrap any callback.

The rate limiter uses a sliding window algorithm to enforce a maximum number of calls per minute. It can be used with both batch and non-batch callbacks.

Usage:

# With batch callback: batch_callback = helper.create_batch_callback(

process_batch_func, batch_size=100, batch_timeout=30

) rate_limiter = helper.create_rate_limiter(max_per_minute=10) rate_limited = rate_limiter.wrap(batch_callback) helper.listen_stream(message_callback=rate_limited)

# With non-batch callback: rate_limiter = helper.create_rate_limiter(max_per_minute=100) rate_limited = rate_limiter.wrap(process_message) helper.listen_stream(message_callback=rate_limited)

Parameters:

max_per_minute (int) – Maximum number of calls allowed per 60-second window

Returns:

RateLimiter instance that can wrap callbacks

Return type:

RateLimiter

Raises:

ValueError – If max_per_minute is not a positive integer

get_opencti_url() bool | int | str | None[source]

Get the OpenCTI URL.

Returns:

The URL of the OpenCTI platform

Return type:

Optional[Union[bool, int, str]]

get_opencti_token() bool | int | str | None[source]

Get the OpenCTI API token.

Returns:

The API token for OpenCTI authentication

Return type:

Optional[Union[bool, int, str]]

get_connector() pycti.connector.opencti_connector.OpenCTIConnector[source]

Get the OpenCTIConnector instance.

Returns:

The OpenCTIConnector instance

Return type:

OpenCTIConnector

date_now() str[source]

Get the current UTC datetime in ISO 8601 format.

Returns the current time with timezone offset notation (+00:00).

Returns:

Current UTC datetime as ISO 8601 string (e.g., “2024-01-15T10:30:00+00:00”)

Return type:

str

Example

>>> helper.date_now()
'2024-01-15T10:30:00+00:00'
date_now_z() str[source]

Get the current UTC datetime in ISO 8601 format with Z suffix.

Returns the current time with ‘Z’ suffix instead of ‘+00:00’. This format is commonly used in STIX objects.

Returns:

Current UTC datetime as ISO 8601 string (e.g., “2024-01-15T10:30:00Z”)

Return type:

str

Example

>>> helper.date_now_z()
'2024-01-15T10:30:00Z'
send_stix2_bundle(bundle: str, **kwargs) list[source]

Send a STIX2 bundle to the OpenCTI platform.

Processes and sends a STIX2 bundle to OpenCTI via the message queue or API. The bundle is split into smaller chunks and sent with proper sequencing. Supports validation workflows, draft mode, and directory export.

Parameters:
  • bundle (str) – Valid STIX2 bundle as a JSON string

  • work_id (str, optional) – Work ID for tracking the import job (default: self.work_id)

  • validation_mode (str, optional) – Validation mode - “workbench” or “draft” (default: self.validation_mode)

  • draft_id (str, optional) – Draft context ID to send the bundle to (default: self.draft_id)

  • entities_types (list, optional) – List of entity types to filter (default: None)

  • update (bool, optional) – Whether to update existing data in the database (default: False)

  • event_version (str, optional) – Event version for the bundle (default: None)

  • bypass_validation (bool, optional) – Skip validation workflow (default: False)

  • force_validation (bool, optional) – Force validation even if not configured (default: self.force_validation)

  • entity_id (str, optional) – Entity ID for context (default: None)

  • file_markings (list, optional) – File markings to apply (default: None)

  • file_name (str, optional) – File name for workbench upload (default: None)

  • send_to_queue (bool, optional) – Whether to send to message queue (default: self.bundle_send_to_queue)

  • cleanup_inconsistent_bundle (bool, optional) – Clean up inconsistent bundle data (default: False)

  • send_to_directory (bool, optional) – Whether to write bundle to directory (default: self.bundle_send_to_directory)

  • send_to_directory_path (str, optional) – Directory path for bundle export (default: self.bundle_send_to_directory_path)

  • send_to_directory_retention (int, optional) – Days to retain exported files (default: self.bundle_send_to_directory_retention)

  • send_to_s3 (bool, optional) – Whether to upload bundle to S3 (default: self.bundle_send_to_s3)

  • no_split (bool, optional) – Whether to send without splitting (default: False)

Returns:

List of processed bundle chunks

Return type:

list

Raises:

ValueError – If the bundle is empty or contains no valid objects

static stix2_deduplicate_objects(items) list[source]

Deduplicate STIX2 objects by their ID.

Removes duplicate STIX2 objects from a list, keeping only the first occurrence of each unique ID.

Parameters:

items (list) – List of STIX2 objects to deduplicate

Returns:

Deduplicated list of STIX2 objects

Return type:

list

static stix2_create_bundle(items) str | None[source]

Create a STIX2 bundle from a list of objects.

Wraps STIX2 objects in a valid bundle structure with a generated UUID. Automatically serializes objects if they are STIX2 library instances.

Parameters:

items (list) – List of STIX2 objects (dicts or STIX2 library objects)

Returns:

JSON string of the STIX2 bundle, or None if items is empty

Return type:

Optional[str]

static check_max_tlp(tlp: str, max_tlp: str) bool[source]

Check if a TLP level is within the allowed maximum TLP level.

Validates that the given TLP marking is at or below the maximum allowed TLP level. Useful for filtering data based on sharing restrictions.

Parameters:
  • tlp (str) – The TLP level to check (e.g., “TLP:GREEN”, “TLP:AMBER”)

  • max_tlp (str) – The highest allowed TLP level for comparison

Returns:

True if the TLP level is within the allowed range, False otherwise

Return type:

bool

Example

>>> OpenCTIConnectorHelper.check_max_tlp("TLP:GREEN", "TLP:AMBER")
True
>>> OpenCTIConnectorHelper.check_max_tlp("TLP:RED", "TLP:GREEN")
False
static get_attribute_in_extension(key: str, stix_object: Dict) any[source]

Get an attribute from OpenCTI STIX extensions.

Retrieves a value from OpenCTI’s custom STIX extension definitions. Checks both the primary OpenCTI extension and the SDO extension, falling back to the object’s root attributes if not found in extensions.

Parameters:
  • key (str) – The attribute key to retrieve

  • stix_object (Dict) – A STIX object dictionary

Returns:

The attribute value, or None if not found

Return type:

any

Example

>>> obj = {"extensions": {"extension-definition--ea279b3e-...": {"score": 85}}}
>>> OpenCTIConnectorHelper.get_attribute_in_extension("score", obj)
85
static get_attribute_in_mitre_extension(key: str, stix_object: Dict) any[source]

Get an attribute from MITRE ATT&CK STIX extension.

Retrieves a value from the MITRE ATT&CK custom STIX extension definition used for attack patterns and techniques.

Parameters:
  • key (str) – The attribute key to retrieve

  • stix_object (Dict) – A STIX object dictionary

Returns:

The attribute value, or None if not found

Return type:

any

Example

>>> obj = {"extensions": {"extension-definition--322b8f77-...": {"x_mitre_version": "1.0"}}}
>>> OpenCTIConnectorHelper.get_attribute_in_mitre_extension("x_mitre_version", obj)
'1.0'
get_data_from_enrichment(data, standard_id, opencti_entity)[source]

Extract STIX entity and objects from enrichment data.

Parameters:
  • data (dict) – The enrichment data containing a bundle

  • standard_id (str) – The STIX standard ID of the entity

  • opencti_entity (dict) – The OpenCTI entity object

Returns:

Dictionary containing stix_entity and stix_objects

Return type:

dict