pycti.connector.opencti_connector_helper
OpenCTI Connector Helper module.
This module provides the main helper class and utilities for building OpenCTI connectors. It handles connector registration, message queue communication, stream listening, scheduling, and STIX2 bundle processing.
- Key components:
OpenCTIConnectorHelper: Main class for connector development
ListenQueue: Handles RabbitMQ message consumption
ListenStream: Handles SSE stream consumption
PingAlive: Maintains connector heartbeat with the platform
ConnectorInfo: Stores connector runtime information
Example
>>> from pycti import OpenCTIConnectorHelper
>>> helper = OpenCTIConnectorHelper(config)
>>> helper.listen(callback_function)
- class pycti.connector.opencti_connector_helper.BatchCallbackWrapper(helper, batch_callback, batch_size=None, batch_timeout=None)[source]
Wraps a batch callback to work with single-message listen_stream.
This class accumulates individual messages and processes them in batches based on batch_size or batch_timeout conditions. It can be used as a callback with the listen_stream method to enable batch processing.
A dedicated timer thread handles all batch processing to avoid race conditions between size-triggered and timeout-triggered batches.
For rate limiting, use the max_per_minute parameter in create_batch_callback().
- Usage:
# Basic batch processing: batch_callback = helper.create_batch_callback(
process_batch_func, batch_size=100, batch_timeout=30
) helper.listen_stream(message_callback=batch_callback)
# With rate limiting (recommended): batch_callback = helper.create_batch_callback(
process_batch_func, batch_size=100, batch_timeout=30, max_per_minute=10
) helper.listen_stream(message_callback=batch_callback)
- __call__(msg)[source]
Accumulate a message into the current batch.
This method only accumulates messages; batch processing is handled by the dedicated timer thread to avoid race conditions.
- Parameters:
msg – SSE message object from the stream
- Return type:
None
- __init__(helper, batch_callback, batch_size=None, batch_timeout=None)[source]
Initialize the batch callback wrapper.
- Parameters:
helper – OpenCTIConnectorHelper instance
batch_callback (
Callable) – Function to call with batched eventsbatch_size (
Optional[int]) – Process batch when this many events accumulatedbatch_timeout (
Optional[float]) – Process batch after this many seconds
- class pycti.connector.opencti_connector_helper.ConnectorInfo(run_and_terminate=False, buffering=False, queue_threshold=500.0, queue_messages_size=0.0, next_run_datetime=None, last_run_datetime=None)[source]
Container for connector runtime information and status.
Stores runtime metrics and status information about the connector, which is sent to the OpenCTI platform during ping operations.
- Parameters:
run_and_terminate (bool) – Whether connector runs once and terminates
buffering (bool) – Whether connector is currently buffering due to queue limits
queue_threshold (float) – Maximum allowed queue size in MB before buffering
queue_messages_size (float) – Current size of queued messages in MB
next_run_datetime (datetime or None) – Scheduled datetime for next connector run
last_run_datetime (datetime or None) – Datetime of the last connector run
Example
>>> info = ConnectorInfo(run_and_terminate=False, queue_threshold=500.0) >>> info.buffering = True >>> info.queue_messages_size = 450.0 >>> details = info.all_details
- __init__(run_and_terminate=False, buffering=False, queue_threshold=500.0, queue_messages_size=0.0, next_run_datetime=None, last_run_datetime=None)[source]
Initialize ConnectorInfo with runtime parameters.
- Parameters:
run_and_terminate (bool) – Whether connector runs once and terminates
buffering (bool) – Whether connector is buffering
queue_threshold (float) – Maximum queue size in MB
queue_messages_size (float) – Current queue size in MB
next_run_datetime (datetime or None) – Next scheduled run time
last_run_datetime (datetime or None) – Last run time
- property all_details[source]
Get all connector information details as a dictionary.
- Returns:
Dictionary containing all connector status information
- Return type:
dict
- property buffering: bool[source]
Get the buffering status.
- Returns:
Whether the connector is currently buffering
- Return type:
bool
- property last_run_datetime: <module 'datetime' from '/home/docs/.asdf/installs/python/3.12.12/lib/python3.12/datetime.py'>[source]
Get the last run datetime.
- Returns:
Datetime of the last connector run, or None if never run
- Return type:
datetime or None
- property next_run_datetime: <module 'datetime' from '/home/docs/.asdf/installs/python/3.12.12/lib/python3.12/datetime.py'>[source]
Get the next scheduled run datetime.
- Returns:
Datetime for the next scheduled run, or None if not scheduled
- Return type:
datetime or None
- property queue_messages_size: float[source]
Get the current queue messages size.
- Returns:
Current size of queued messages in MB
- Return type:
float
- pycti.connector.opencti_connector_helper.FALSY: List[str] = ['no', 'false', 'False'][source]
List of string values considered as boolean False.
- class pycti.connector.opencti_connector_helper.ListenQueue(helper, opencti_token, config, connector_config, applicant_id, listen_protocol, listen_protocol_api_ssl, listen_protocol_api_path, listen_protocol_api_port, callback)[source]
Thread class for consuming messages from RabbitMQ or HTTP API.
Handles message consumption from either RabbitMQ (AMQP protocol) or an HTTP API endpoint, depending on the configured listen protocol. Messages are processed through a callback function provided at initialization.
- This class supports two listen protocols:
AMQP: Connects to RabbitMQ and consumes messages from a queue
API: Starts an HTTP server and receives messages via POST requests
- Parameters:
helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance
opencti_token (str) – Authentication token for OpenCTI API
config (Dict) – Global configuration dictionary
connector_config (Dict) – Connector-specific configuration from registration
applicant_id (str) – ID of the user/connector making requests
listen_protocol (str) – Protocol to use (“AMQP” or “API”)
listen_protocol_api_ssl (bool) – Whether to use SSL for API protocol
listen_protocol_api_path (str) – URL path for API endpoint
listen_protocol_api_port (int) – Port for API server
callback (Callable[[Dict], str]) – Function to call when processing messages
- __init__(helper, opencti_token, config, connector_config, applicant_id, listen_protocol, listen_protocol_api_ssl, listen_protocol_api_path, listen_protocol_api_port, callback)[source]
Initialize the ListenQueue thread.
- Parameters:
helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance
opencti_token (str) – Authentication token for OpenCTI API
config (Dict) – Global configuration dictionary
connector_config (Dict) – Connector configuration from registration
applicant_id (str) – ID of the user/connector making requests
listen_protocol (str) – Protocol to use (“AMQP” or “API”)
listen_protocol_api_ssl (bool) – Whether to use SSL for API protocol
listen_protocol_api_path (str) – URL path for API endpoint
listen_protocol_api_port (int) – Port for API server
callback (Callable[[Dict], str]) – Function to process received messages
- is_token_valid(token)[source]
Returns True if the signature is valid and token is not expired. Returns False otherwise.
- run()[source]
Execute the message listening thread.
Starts the appropriate listener based on the configured protocol: - AMQP: Connects to RabbitMQ and consumes messages from the queue - API: Starts a FastAPI/Uvicorn HTTP server to receive messages
The thread runs until stopped via the stop() method or an error occurs.
- Raises:
ValueError – If an unsupported listen protocol is configured
- Return type:
None
- class pycti.connector.opencti_connector_helper.ListenStream(helper, callback, url, token, verify_ssl, start_timestamp, live_stream_id, listen_delete, no_dependencies, recover_iso_date, with_inferences)[source]
Thread class for consuming events from OpenCTI SSE stream.
Connects to an OpenCTI event stream and processes events through a callback function. Supports recovery from a specific point in time and various filtering options.
- Parameters:
helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance
callback (Callable) – Function to call for each stream event
url (str) – Base URL for the stream endpoint
token (str) – Authentication token for the stream
verify_ssl (bool) – Whether to verify SSL certificates
start_timestamp (str or None) – Timestamp to start reading from (format: “timestamp-0”)
live_stream_id (str or None) – ID of the specific stream to connect to
listen_delete (bool) – Whether to receive delete events
no_dependencies (bool) – Whether to exclude dependency objects
recover_iso_date (str or None) – ISO date to recover events from
with_inferences (bool) – Whether to include inferred relationships
- __init__(helper, callback, url, token, verify_ssl, start_timestamp, live_stream_id, listen_delete, no_dependencies, recover_iso_date, with_inferences)[source]
Initialize the ListenStream thread.
- Parameters:
helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance
callback (Callable) – Function to process stream events
url (str) – Base URL for the stream endpoint
token (str) – Authentication token
verify_ssl (bool) – Whether to verify SSL certificates
start_timestamp (str or None) – Starting timestamp for stream position
live_stream_id (str or None) – Specific stream ID to connect to
listen_delete (bool) – Whether to receive delete events
no_dependencies (bool) – Whether to exclude dependencies
recover_iso_date (str or None) – ISO date for event recovery
with_inferences (bool) – Whether to include inferences
- run()[source]
Execute the stream listening loop.
Connects to the OpenCTI SSE stream and processes events: - Initializes or restores stream position from connector state - Starts a StreamAlive watchdog for health monitoring - Processes heartbeat, connected, and data events - Updates connector state with latest event ID - Calls the callback function for each data event
The loop continues until stopped or an error occurs.
- Return type:
None
- class pycti.connector.opencti_connector_helper.OpenCTIConnectorHelper(config, playbook_compatible=False)[source]
Main helper class for developing OpenCTI connectors.
Provides a comprehensive API for connector development, handling: - Connector registration and configuration - Message queue communication (RabbitMQ/API) - SSE stream consumption - STIX2 bundle creation and submission - Scheduling and lifecycle management - Metrics and logging
- Parameters:
config (Dict) – Configuration dictionary containing OpenCTI and connector settings
playbook_compatible (bool) – Whether the connector can be used in playbooks
Example
>>> config = { ... "opencti": {"url": "http://localhost:8080", "token": "xxx"}, ... "connector": {"id": "xxx", "name": "My Connector", "type": "EXTERNAL_IMPORT"} ... } >>> helper = OpenCTIConnectorHelper(config) >>> helper.listen(my_callback_function)
- class TimeUnit(*values)[source]
Time unit enumeration for scheduling intervals (deprecated).
Use ISO 8601 duration format with schedule_iso() instead.
- Variables:
SECONDS – 1 second
MINUTES – 60 seconds
HOURS – 3600 seconds
DAYS – 86400 seconds
WEEKS – 604800 seconds
YEARS – 31536000 seconds
- __init__(config, playbook_compatible=False)[source]
Initialize the OpenCTIConnectorHelper.
- Parameters:
config (Dict) – Configuration dictionary with OpenCTI and connector settings
playbook_compatible (bool) – Whether the connector can be used in playbooks
- check_connector_buffering()[source]
Check if the RabbitMQ queue has exceeded the allowed threshold.
- Returns:
True if queue size exceeds threshold, False otherwise
- Return type:
bool
- static check_max_tlp(tlp, max_tlp)[source]
Check if a TLP level is within the allowed maximum TLP level.
Validates that the given TLP marking is at or below the maximum allowed TLP level. Useful for filtering data based on sharing restrictions.
- Parameters:
tlp (str) – The TLP level to check (e.g., “TLP:GREEN”, “TLP:AMBER”)
max_tlp (str) – The highest allowed TLP level for comparison
- Returns:
True if the TLP level is within the allowed range, False otherwise
- Return type:
bool
Example
>>> OpenCTIConnectorHelper.check_max_tlp("TLP:GREEN", "TLP:AMBER") True >>> OpenCTIConnectorHelper.check_max_tlp("TLP:RED", "TLP:GREEN") False
- create_batch_callback(batch_callback, batch_size=None, batch_timeout=None, max_per_minute=None)[source]
Create a callback wrapper that batches messages.
This factory method creates a BatchCallbackWrapper that can be used with listen_stream to enable batch processing of events.
For rate limiting, use the max_per_minute parameter (recommended).
- Usage:
# Basic batch processing: batch_callback = helper.create_batch_callback(
process_batch_func, batch_size=100, batch_timeout=30
) helper.listen_stream(message_callback=batch_callback)
# With rate limiting (recommended): batch_callback = helper.create_batch_callback(
process_batch_func, batch_size=100, batch_timeout=30, max_per_minute=10
) helper.listen_stream(message_callback=batch_callback)
- The batch callback receives a dictionary with the following structure:
- {
“events”: [list of SSE messages], “batch_metadata”: {
“batch_size”: int, “trigger_reason”: str, # “size_limit”, “timeout”, “shutdown” “elapsed_time”: float, “timestamp”: float,
}
}
- Parameters:
batch_callback (Callable[[dict], None]) – Function to call with batched events
batch_size (int or None) – Process batch when this many events accumulated (optional)
batch_timeout (float or None) – Process batch after this many seconds (optional)
max_per_minute (int or None) – Maximum batch callbacks per minute (optional)
- Returns:
BatchCallbackWrapper instance for use with listen_stream
- Return type:
- Raises:
ValueError – If neither batch_size nor batch_timeout is specified
ValueError – If batch_size is not a positive integer
ValueError – If batch_timeout is not a positive number
ValueError – If max_per_minute is not a positive integer
- create_rate_limiter(max_per_minute)[source]
Create a rate limiter that can wrap any callback.
The rate limiter uses a sliding window algorithm to enforce a maximum number of calls per minute. It can be used with both batch and non-batch callbacks.
- Usage:
# With batch callback: batch_callback = helper.create_batch_callback(
process_batch_func, batch_size=100, batch_timeout=30
) rate_limiter = helper.create_rate_limiter(max_per_minute=10) rate_limited = rate_limiter.wrap(batch_callback) helper.listen_stream(message_callback=rate_limited)
# With non-batch callback: rate_limiter = helper.create_rate_limiter(max_per_minute=100) rate_limited = rate_limiter.wrap(process_message) helper.listen_stream(message_callback=rate_limited)
- Parameters:
max_per_minute (int) – Maximum number of calls allowed per 60-second window
- Returns:
RateLimiter instance that can wrap callbacks
- Return type:
- Raises:
ValueError – If max_per_minute is not a positive integer
- date_now()[source]
Get the current UTC datetime in ISO 8601 format.
Returns the current time with timezone offset notation (+00:00).
- Returns:
Current UTC datetime as ISO 8601 string (e.g., “2024-01-15T10:30:00+00:00”)
- Return type:
str
Example
>>> helper.date_now() '2024-01-15T10:30:00+00:00'
- date_now_z()[source]
Get the current UTC datetime in ISO 8601 format with Z suffix.
Returns the current time with ‘Z’ suffix instead of ‘+00:00’. This format is commonly used in STIX objects.
- Returns:
Current UTC datetime as ISO 8601 string (e.g., “2024-01-15T10:30:00Z”)
- Return type:
str
Example
>>> helper.date_now_z() '2024-01-15T10:30:00Z'
- force_ping()[source]
Force a ping to the OpenCTI API to update connector state.
This method manually triggers a ping to synchronize the connector state with the OpenCTI platform.
- static get_attribute_in_extension(key, stix_object)[source]
Get an attribute from OpenCTI STIX extensions.
Retrieves a value from OpenCTI’s custom STIX extension definitions. Checks both the primary OpenCTI extension and the SDO extension, falling back to the object’s root attributes if not found in extensions.
- Parameters:
key (str) – The attribute key to retrieve
stix_object (Dict) – A STIX object dictionary
- Returns:
The attribute value, or None if not found
- Return type:
any
Example
>>> obj = {"extensions": {"extension-definition--ea279b3e-...": {"score": 85}}} >>> OpenCTIConnectorHelper.get_attribute_in_extension("score", obj) 85
- static get_attribute_in_mitre_extension(key, stix_object)[source]
Get an attribute from MITRE ATT&CK STIX extension.
Retrieves a value from the MITRE ATT&CK custom STIX extension definition used for attack patterns and techniques.
- Parameters:
key (str) – The attribute key to retrieve
stix_object (Dict) – A STIX object dictionary
- Returns:
The attribute value, or None if not found
- Return type:
any
Example
>>> obj = {"extensions": {"extension-definition--322b8f77-...": {"x_mitre_version": "1.0"}}} >>> OpenCTIConnectorHelper.get_attribute_in_mitre_extension("x_mitre_version", obj) '1.0'
- get_connector()[source]
Get the OpenCTIConnector instance.
- Returns:
The OpenCTIConnector instance
- Return type:
- get_data_from_enrichment(data, standard_id, opencti_entity)[source]
Extract STIX entity and objects from enrichment data.
- Parameters:
data (dict) – The enrichment data containing a bundle
standard_id (str) – The STIX standard ID of the entity
opencti_entity (dict) – The OpenCTI entity object
- Returns:
Dictionary containing stix_entity and stix_objects
- Return type:
dict
- get_name()[source]
Get the connector name.
- Returns:
The name of the connector
- Return type:
Optional[Union[bool, int, str]]
- get_only_contextual()[source]
Get the only_contextual configuration value.
- Returns:
Whether the connector processes only contextual data
- Return type:
Optional[Union[bool, int, str]]
- get_opencti_token()[source]
Get the OpenCTI API token.
- Returns:
The API token for OpenCTI authentication
- Return type:
Optional[Union[bool, int, str]]
- get_opencti_url()[source]
Get the OpenCTI URL.
- Returns:
The URL of the OpenCTI platform
- Return type:
Optional[Union[bool, int, str]]
- get_run_and_terminate()[source]
Get the run_and_terminate configuration value.
- Returns:
Whether the connector should run once and terminate
- Return type:
Optional[Union[bool, int, str]]
- get_state()[source]
Get the connector state.
Retrieves the current connector state that was previously stored. The state is used to track progress and resume operations across runs.
- Returns:
The current state of the connector, or None if no state exists
- Return type:
Optional[Dict]
- get_stream_collection()[source]
Get the stream collection configuration.
- Returns:
Stream collection configuration dictionary
- Return type:
dict
- Raises:
ValueError – If no stream is connected
- get_validate_before_import()[source]
Get the validate_before_import configuration value.
- Returns:
Whether to validate data before importing
- Return type:
Optional[Union[bool, int, str]]
- last_run_datetime()[source]
Set the last run datetime to the current UTC time in ISO format.
- Return type:
None
- listen(message_callback)[source]
Listen for messages from the queue and process them via callback.
Starts a listener thread that consumes messages from RabbitMQ or HTTP API (depending on configured listen protocol) and processes each message through the provided callback function. This method blocks until the listener is stopped.
- Parameters:
message_callback (Callable[[Dict], str]) – Function to process incoming messages. Receives event data dict and should return a status message string.
- Return type:
None
- listen_stream(message_callback, url=None, token=None, verify_ssl=None, start_timestamp=None, live_stream_id=None, listen_delete=None, no_dependencies=None, recover_iso_date=None, with_inferences=None)[source]
Start listening to an OpenCTI event stream.
Connects to an SSE stream and processes events through the callback. Parameters default to connector configuration values if not specified.
- Parameters:
message_callback (Callable) – Function to call for each stream event
url (str or None) – Base URL for stream (defaults to opencti_url)
token (str or None) – Authentication token (defaults to opencti_token)
verify_ssl (bool or None) – Whether to verify SSL certificates
start_timestamp (str or None) – Stream position to start from
live_stream_id (str or None) – Specific stream ID to connect to
listen_delete (bool or None) – Whether to receive delete events
no_dependencies (bool or None) – Whether to exclude dependencies
recover_iso_date (str or None) – ISO date to recover events from
with_inferences (bool or None) – Whether to include inferred data
- Returns:
The started ListenStream thread
- Return type:
- next_run_datetime(duration_period_in_seconds)[source]
Calculate and set the next scheduled run datetime in ISO format.
- Parameters:
duration_period_in_seconds (Union[int, float]) – Duration in seconds until next run
- Return type:
None
- schedule_iso(message_callback, duration_period)[source]
Schedule connector execution using ISO 8601 duration format.
- Parameters:
message_callback (Callable[[], None]) – The connector process callback function
duration_period (str) – Duration in ISO 8601 format (e.g., “P18Y9W4DT11H9M8S”)
- Return type:
None
- schedule_process(message_callback, duration_period)[source]
Schedule the execution of a connector process.
If duration_period is zero or connect_run_and_terminate is True, the process will run once and terminate. Otherwise, it schedules the next run based on the interval.
- Parameters:
message_callback (Callable[[], None]) – The connector process callback function
duration_period (Union[int, float]) – The connector’s interval in seconds
- Return type:
None
- schedule_unit(message_callback, duration_period, time_unit)[source]
Schedule connector execution with a time unit (deprecated).
This method manages backward compatibility of intervals on connectors. Use schedule_iso method instead.
- Parameters:
message_callback (Callable[[], None]) – The connector process callback function
duration_period (Union[int, float, str]) – The connector interval value
time_unit (TimeUnit) – The unit of time (YEARS, WEEKS, DAYS, HOURS, MINUTES, SECONDS)
- Return type:
None
- send_stix2_bundle(bundle, **kwargs)[source]
Send a STIX2 bundle to the OpenCTI platform.
Processes and sends a STIX2 bundle to OpenCTI via the message queue or API. The bundle is split into smaller chunks and sent with proper sequencing. Supports validation workflows, draft mode, and directory export.
- Parameters:
bundle (str) – Valid STIX2 bundle as a JSON string
work_id (str, optional) – Work ID for tracking the import job (default: self.work_id)
validation_mode (str, optional) – Validation mode - “workbench” or “draft” (default: self.validation_mode)
draft_id (str, optional) – Draft context ID to send the bundle to (default: self.draft_id)
entities_types (list, optional) – List of entity types to filter (default: None)
update (bool, optional) – Whether to update existing data in the database (default: False)
event_version (str, optional) – Event version for the bundle (default: None)
bypass_validation (bool, optional) – Skip validation workflow (default: False)
force_validation (bool, optional) – Force validation even if not configured (default: self.force_validation)
entity_id (str, optional) – Entity ID for context (default: None)
file_markings (list, optional) – File markings to apply (default: None)
file_name (str, optional) – File name for workbench upload (default: None)
send_to_queue (bool, optional) – Whether to send to message queue (default: self.bundle_send_to_queue)
cleanup_inconsistent_bundle (bool, optional) – Clean up inconsistent bundle data (default: False)
send_to_directory (bool, optional) – Whether to write bundle to directory (default: self.bundle_send_to_directory)
send_to_directory_path (str, optional) – Directory path for bundle export (default: self.bundle_send_to_directory_path)
send_to_directory_retention (int, optional) – Days to retain exported files (default: self.bundle_send_to_directory_retention)
send_to_s3 (bool, optional) – Whether to upload bundle to S3 (default: self.bundle_send_to_s3)
no_split (bool, optional) – Whether to send without splitting (default: False)
- Returns:
List of processed bundle chunks
- Return type:
list
- Raises:
ValueError – If the bundle is empty or contains no valid objects
- set_state(state)[source]
Set the connector state.
Stores the connector state as a JSON string for persistence across runs. The state can be retrieved later using get_state().
- Parameters:
state (Dict or None) – State object to store, or None to clear the state
- Return type:
None
- static stix2_create_bundle(items)[source]
Create a STIX2 bundle from a list of objects.
Wraps STIX2 objects in a valid bundle structure with a generated UUID. Automatically serializes objects if they are STIX2 library instances.
- Parameters:
items (list) – List of STIX2 objects (dicts or STIX2 library objects)
- Returns:
JSON string of the STIX2 bundle, or None if items is empty
- Return type:
Optional[str]
- static stix2_deduplicate_objects(items)[source]
Deduplicate STIX2 objects by their ID.
Removes duplicate STIX2 objects from a list, keeping only the first occurrence of each unique ID.
- Parameters:
items (list) – List of STIX2 objects to deduplicate
- Returns:
Deduplicated list of STIX2 objects
- Return type:
list
- class pycti.connector.opencti_connector_helper.PingAlive(connector_logger, connector_id, api, get_state, set_state, metric, connector_info)[source]
Daemon thread that maintains connector heartbeat with OpenCTI platform.
Periodically pings the OpenCTI API to indicate the connector is alive and synchronizes connector state between local and remote instances.
- Parameters:
connector_logger (logging.Logger) – Logger instance for the connector
connector_id (str) – Unique identifier of the connector
api (OpenCTIApiClient) – OpenCTI API client instance
get_state (Callable[[], Optional[Dict]]) – Function to retrieve current connector state
set_state (Callable[[str], None]) – Function to update connector state
metric (OpenCTIMetricHandler) – Metric handler for recording ping statistics
connector_info (ConnectorInfo) – ConnectorInfo instance with runtime details
- __init__(connector_logger, connector_id, api, get_state, set_state, metric, connector_info)[source]
Initialize the PingAlive daemon thread.
- Parameters:
connector_logger (logging.Logger) – Logger instance for the connector
connector_id (str) – Unique identifier of the connector
api (OpenCTIApiClient) – OpenCTI API client instance
get_state (Callable[[], Optional[Dict]]) – Function to retrieve current connector state
set_state (Callable[[str], None]) – Function to update connector state
metric (OpenCTIMetricHandler) – Metric handler for recording ping statistics
connector_info (ConnectorInfo) – ConnectorInfo instance with runtime details
- ping()[source]
Execute the ping loop to maintain connector heartbeat.
Continuously pings the OpenCTI API every 40 seconds to: - Signal that the connector is alive - Send current connector state and info - Receive and apply any remote state updates
If the remote state differs from local state, the local state is updated to match. This allows state resets from the UI.
The loop continues until the exit_event is set.
- Return type:
None
- class pycti.connector.opencti_connector_helper.RateLimitedCallback(rate_limiter, callback)[source]
A callback wrapper that applies rate limiting.
This class wraps any callable and applies rate limiting before each call. It preserves the stop() and set_heartbeat_queue() methods from the wrapped callback if they exist.
- __init__(rate_limiter, callback)[source]
Initialize the rate-limited callback.
- Parameters:
rate_limiter (
RateLimiter) – The RateLimiter instance to usecallback (
Callable) – The callback to wrap
- class pycti.connector.opencti_connector_helper.RateLimiter(helper, max_per_minute)[source]
Rate limiter using sliding window algorithm.
- Usage:
# For batch mode (alternative to using max_per_minute in create_batch_callback): batch_callback = helper.create_batch_callback(
process_batch_func, batch_size=100, batch_timeout=30
) rate_limiter = helper.create_rate_limiter(max_per_minute=10) rate_limited_callback = rate_limiter.wrap(batch_callback) helper.listen_stream(message_callback=rate_limited_callback)
# For non-batch mode: rate_limiter = helper.create_rate_limiter(max_per_minute=100) rate_limited_callback = rate_limiter.wrap(process_message) helper.listen_stream(message_callback=rate_limited_callback)
- __init__(helper, max_per_minute)[source]
Initialize the rate limiter.
- Parameters:
helper – OpenCTIConnectorHelper instance for logging
max_per_minute (
int) – Maximum number of calls allowed per minute
- set_heartbeat_queue(q)[source]
Set the heartbeat queue for sending keepalive signals during rate limiting.
- Parameters:
q (
Queue) – Queue used by StreamAlive for heartbeat monitoring- Return type:
None
- wait_if_needed()[source]
Wait if rate limit is exceeded. Thread-safe.
Uses a sliding window algorithm to enforce max_per_minute. Sleeps if necessary to stay within the limit.
- Return type:
float- Returns:
Time spent waiting (seconds), 0 if no wait needed
- class pycti.connector.opencti_connector_helper.StreamAlive(helper, q)[source]
Watchdog thread that monitors SSE stream health via heartbeat messages.
Monitors a queue for heartbeat signals from the stream listener. If no heartbeat is received within 45 seconds, the connector is stopped to allow for reconnection.
- Parameters:
helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance
q (Queue) – Queue for receiving heartbeat signals from stream listener
- __init__(helper, q)[source]
Initialize the StreamAlive watchdog thread.
- Parameters:
helper (OpenCTIConnectorHelper) – The OpenCTIConnectorHelper instance
q (Queue) – Queue for receiving heartbeat signals
- pycti.connector.opencti_connector_helper.TRUTHY: List[str] = ['yes', 'true', 'True'][source]
List of string values considered as boolean True.
- pycti.connector.opencti_connector_helper.create_callback_ssl_context(config)[source]
Create an SSL context for the API callback server.
Creates and configures an SSL context suitable for the HTTPS callback server used in API listen protocol mode. Loads certificate chain from configuration.
- Configuration keys used:
LISTEN_PROTOCOL_API_SSL_KEY: Path or PEM string for SSL private key
LISTEN_PROTOCOL_API_SSL_CERT: Path or PEM string for SSL certificate
LISTEN_PROTOCOL_API_SSL_PASSPHRASE: Optional passphrase for private key
- Parameters:
config (Dict) – Configuration dictionary containing SSL settings
- Returns:
Configured SSL context for client authentication
- Return type:
ssl.SSLContext
- pycti.connector.opencti_connector_helper.create_mq_ssl_context(config)[source]
Create an SSL context for RabbitMQ message queue connections.
Creates and configures an SSL context for secure connections to RabbitMQ. Supports CA verification, client certificates, and optional certificate verification bypass.
- Configuration keys used:
MQ_USE_SSL_CA: CA certificate for server verification
MQ_USE_SSL_CERT: Client certificate for mutual TLS
MQ_USE_SSL_KEY: Client private key for mutual TLS
MQ_USE_SSL_REJECT_UNAUTHORIZED: Whether to verify server certificate
MQ_USE_SSL_PASSPHRASE: Optional passphrase for private key
- Parameters:
config (Dict) – Configuration dictionary containing MQ SSL settings
- Returns:
Configured SSL context for RabbitMQ connections
- Return type:
ssl.SSLContext
- pycti.connector.opencti_connector_helper.data_to_temp_file(data)[source]
Write data to a temporary file securely.
Creates a temporary file with secure permissions (readable and writable only by the creating user). The file descriptor is not inherited by child processes.
Note
The caller is responsible for deleting the temporary file when it is no longer needed.
- Parameters:
data (str) – The string data to write to the temporary file
- Returns:
Absolute path to the created temporary file
- Return type:
str
Example
>>> path = data_to_temp_file("-----BEGIN PRIVATE KEY-----\n...") >>> # Use the file... >>> os.unlink(path) # Clean up when done
- pycti.connector.opencti_connector_helper.get_config_variable(env_var, yaml_path, config=None, isNumber=False, default=None, required=False)[source]
Retrieve a configuration variable from environment or YAML config.
Looks up configuration values with the following precedence: 1. Environment variable (highest priority) 2. YAML configuration file 3. Default value (lowest priority)
Boolean string values (“yes”, “true”, “True”, “no”, “false”, “False”) are automatically converted to Python bool.
- Parameters:
env_var (str) – Name of the environment variable to check
yaml_path (List[str]) – Two-element list specifying [section, key] in YAML config
config (Dict) – Configuration dictionary loaded from YAML file
isNumber (bool) – If True, convert the value to integer
default (any) – Default value if not found in env or config
required (bool) – If True and no value found, raise ValueError
- Returns:
The configuration value as bool, int, str, or None
- Return type:
Union[bool, int, None, str]
- Raises:
ValueError – If required=True and no value is found
Example
>>> get_config_variable("OPENCTI_URL", ["opencti", "url"], config) 'http://localhost:8080' >>> get_config_variable("CONNECTOR_LOG_LEVEL", ["connector", "log_level"], ... config, default="INFO") 'INFO'
- pycti.connector.opencti_connector_helper.is_memory_certificate(certificate)[source]
Check if a certificate is provided as a PEM string in memory.
Determines whether the certificate data is an in-memory PEM-formatted string (starting with “—–BEGIN”) rather than a file path.
- Parameters:
certificate (str) – The certificate data to check (PEM string or file path)
- Returns:
True if the certificate is a PEM string, False if it’s a file path
- Return type:
bool
Example
>>> is_memory_certificate("-----BEGIN CERTIFICATE-----\n...") True >>> is_memory_certificate("/path/to/cert.pem") False
- pycti.connector.opencti_connector_helper.killProgramHook(etype, value, tb)[source]
Exception hook to terminate the program on unhandled exceptions.
This function is used as a system exception hook to ensure the program terminates cleanly when an unhandled exception occurs, particularly useful for background threads.
- Parameters:
etype (type) – The exception type (class)
value (BaseException) – The exception instance
tb (types.TracebackType) – The traceback object
- Return type:
None
- pycti.connector.opencti_connector_helper.normalize_email_prefix(email)[source]
Normalize the local part of an email address by replacing invalid characters.
Replaces any characters not valid in email prefixes with hyphens. Valid characters include: a-z, A-Z, 0-9, and special chars: . _ + - Consecutive hyphens are collapsed and leading/trailing hyphens are removed.
- Parameters:
email (str) – Email address to normalize
- Returns:
Normalized email address with valid local part
- Return type:
str
- Raises:
ValueError – If the email address does not contain an ‘@’ symbol
Example
>>> normalize_email_prefix("john.doe@example.com") 'john.doe@example.com' >>> normalize_email_prefix("john@doe@example.com") 'john-doe@example.com' >>> normalize_email_prefix("user!name@domain.com") 'user-name@domain.com'
- pycti.connector.opencti_connector_helper.ssl_cert_chain(ssl_context, cert_data, key_data, passphrase)[source]
Load a certificate chain and private key into an SSL context.
Configures the SSL context with a client certificate and private key for mutual TLS authentication. Supports both file paths and in-memory PEM-formatted certificates/keys. Temporary files are created and cleaned up automatically when using in-memory data.
- Parameters:
ssl_context (ssl.SSLContext) – The SSL context to configure
cert_data (str or None) – Certificate data as file path or PEM string, or None to skip
key_data (str or None) – Private key data as file path or PEM string, or None
passphrase (str or None) – Passphrase for encrypted private key, or None if unencrypted
- Return type:
None
Example
>>> ssl_ctx = ssl.create_default_context() >>> ssl_cert_chain(ssl_ctx, "/path/to/cert.pem", "/path/to/key.pem", None)
- pycti.connector.opencti_connector_helper.ssl_verify_locations(ssl_context, certdata)[source]
Load CA certificate verification locations into an SSL context.
Configures the SSL context with certificate authority (CA) certificates for verifying peer certificates. Supports both file paths and in-memory PEM-formatted certificates.
- Parameters:
ssl_context (ssl.SSLContext) – The SSL context to configure
certdata (str or None) – CA certificate data as file path or PEM string, or None to skip
- Return type:
None
Example
>>> ssl_ctx = ssl.create_default_context() >>> ssl_verify_locations(ssl_ctx, "/path/to/ca-bundle.crt") >>> ssl_verify_locations(ssl_ctx, "-----BEGIN CERTIFICATE-----\n...")
- pycti.connector.opencti_connector_helper.start_loop(loop)[source]
Start an asyncio event loop and run it forever.
Sets the given event loop as the current loop for the thread and runs it indefinitely until stopped.
- Parameters:
loop (asyncio.AbstractEventLoop) – The asyncio event loop to start
- Return type:
None