import time
import traceback
from typing import Dict, List, Optional
[docs]
class OpenCTIApiWork:
"""OpenCTI Work API class.
Manages work/job operations for connectors.
:param api: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type api: OpenCTIApiClient
"""
[docs]
def __init__(self, api):
"""Initialize the OpenCTIApiWork instance.
:param api: OpenCTI API client instance
:type api: OpenCTIApiClient
"""
[docs]
def to_received(self, work_id: str, message: str):
"""Mark work as received.
:param work_id: the work id
:type work_id: str
:param message: the message to report
:type message: str
:return: None
:rtype: None
"""
if self.api.bundle_send_to_queue:
self.api.app_logger.info(
"Reporting work update_received", {"work_id": work_id}
)
query = """
mutation workToReceived($id: ID!, $message: String) {
workEdit(id: $id) {
toReceived (message: $message)
}
}
"""
self.api.query(query, {"id": work_id, "message": message}, True)
[docs]
def to_processed(self, work_id: str, message: str, in_error: bool = False):
"""Mark work as processed.
:param work_id: the work id
:type work_id: str
:param message: the message to report
:type message: str
:param in_error: whether the work completed with error, defaults to False
:type in_error: bool, optional
:return: None
:rtype: None
"""
if self.api.bundle_send_to_queue:
self.api.app_logger.info(
"Reporting work update_processed", {"work_id": work_id}
)
query = """
mutation workToProcessed($id: ID!, $message: String, $inError: Boolean) {
workEdit(id: $id) {
toProcessed (message: $message, inError: $inError)
}
}
"""
self.api.query(
query, {"id": work_id, "message": message, "inError": in_error}, True
)
[docs]
def ping(self, work_id: str):
"""Ping a work to keep it alive.
:param work_id: the work id
:type work_id: str
:return: None
:rtype: None
"""
self.api.app_logger.info("Ping work", {"work_id": work_id})
query = """
mutation pingWork($id: ID!) {
workEdit(id: $id) {
ping
}
}
"""
self.api.query(query, {"id": work_id})
[docs]
def report_expectation(self, work_id: str, error):
"""Report a work expectation.
:param work_id: the work id
:type work_id: str
:param error: the error to report (WorkErrorInput format)
:type error: dict
:return: None
:rtype: None
"""
if self.api.bundle_send_to_queue:
self.api.app_logger.info("Report expectation", {"work_id": work_id})
query = """
mutation reportExpectation($id: ID!, $error: WorkErrorInput) {
workEdit(id: $id) {
reportExpectation(error: $error)
}
}
"""
try:
self.api.query(query, {"id": work_id, "error": error}, True)
except Exception:
self.api.app_logger.error("Cannot report expectation")
[docs]
def add_expectations(self, work_id: str, expectations: int):
"""Add expectations to a work.
:param work_id: the work id
:type work_id: str
:param expectations: the number of expectations to add
:type expectations: int
:return: whether the work is still alive
:rtype: bool
"""
if self.api.bundle_send_to_queue:
self.api.app_logger.info(
"Update action expectations",
{"work_id": work_id, "expectations": expectations},
)
query = """
mutation addExpectations($id: ID!, $expectations: Int) {
workEdit(id: $id) {
addExpectations(expectations: $expectations)
}
}
"""
try:
self.api.query(
query, {"id": work_id, "expectations": expectations}, True
)
except Exception:
error_msg = traceback.format_exc()
if "WORK_NOT_ALIVE" in error_msg:
self.api.app_logger.info(
"Work no longer exists",
{"work_id": work_id},
)
return False
else:
self.api.app_logger.error("Cannot add expectations")
return True
[docs]
def add_draft_context(self, work_id: str, draft_context: str):
"""Add draft context to a work.
:param work_id: the work id
:type work_id: str
:param draft_context: the draft context to add
:type draft_context: str
:return: None
:rtype: None
"""
if self.api.bundle_send_to_queue:
self.api.app_logger.info(
"Update draft context",
{"work_id": work_id, "draft_context": draft_context},
)
query = """
mutation addDraftContext($id: ID!, $draftContext: String) {
workEdit(id: $id) {
addDraftContext(draftContext: $draftContext)
}
}
"""
try:
self.api.query(
query, {"id": work_id, "draftContext": draft_context}, True
)
except Exception:
self.api.app_logger.error("Cannot add draft context")
[docs]
def initiate_work(
self,
connector_id: str,
friendly_name: str,
is_multipart: bool = False,
) -> Optional[str]:
"""Initiate a new work for a connector.
:param connector_id: the connector id
:type connector_id: str
:param friendly_name: the friendly name for the work
:type friendly_name: str
:param is_multipart: indicates whether multiple calls to `add_expectations`
are to be expected during the lifetime of the work.
In consequence the work won't automatically
transition to `complete` when the number of calls
to `report_expectation` matches the expectations
but only when an explicit call to `to_processed`
is made.
Should be set to `True` when sending multiple
STIX bundles consecutively via `send_stix2_bundle`
during the work's lifetime.
Defaults to `False`.
:type is_multipart: bool
:return: the work id or None if bundle_send_to_queue is False
:rtype: str or None
"""
if self.api.bundle_send_to_queue:
self.api.app_logger.info(
"Initiate work",
{
"connector_id": connector_id,
"friendly_name": friendly_name,
"is_multipart": is_multipart,
},
)
query = """
mutation workAdd($connectorId: String!, $friendlyName: String, $isMultiPartWork: Boolean) {
workAdd(connectorId: $connectorId, friendlyName: $friendlyName, isMultiPartWork: $isMultiPartWork) {
id
}
}
"""
work = self.api.query(
query,
{
"connectorId": connector_id,
"friendlyName": friendly_name,
"isMultiPartWork": is_multipart,
},
True,
)
return work["data"]["workAdd"]["id"]
return None
[docs]
def delete_work(self, work_id: str):
"""Delete a work.
.. deprecated::
Use :meth:`delete` instead.
:param work_id: the work id
:type work_id: str
:return: the response data
:rtype: dict
"""
return self.delete(id=work_id)
[docs]
def delete(self, **kwargs):
"""Delete a work by id.
:param id: the work id
:type id: str
:return: the response data
:rtype: dict or None
"""
work_id = kwargs.get("id", None)
if work_id is None:
self.api.admin_logger.error(
"[opencti_work] Cannot delete work, missing parameter: id"
)
return None
query = """
mutation ConnectorWorksMutation($workId: ID!) {
workEdit(id: $workId) {
delete
}
}"""
work = self.api.query(
query,
{"workId": work_id},
)
return work["data"]
[docs]
def wait_for_work_to_finish(self, work_id: str):
"""Wait for a work to finish.
:param work_id: the work id
:type work_id: str
:return: empty string if error, None otherwise
:rtype: str or None
"""
status = ""
while status != "complete":
state = self.get_work(work_id=work_id)
if len(state) > 0:
status = state["status"]
if state["errors"]:
self.api.app_logger.error(
"Unexpected connector error", {"state_errors": state["errors"]}
)
return ""
if status == "complete":
return
time.sleep(1)
[docs]
def get_work(self, work_id: str) -> Dict:
"""Get a work by id.
:param work_id: the work id
:type work_id: str
:return: the work data
:rtype: dict
"""
query = """
query WorkQuery($id: ID!) {
work(id: $id) {
id
name
user {
name
}
timestamp
status
event_source_id
received_time
processed_time
completed_time
tracking {
import_expected_number
import_processed_number
}
messages {
timestamp
message
sequence
source
}
errors {
timestamp
message
sequence
source
}
}
}
"""
result = self.api.query(query, {"id": work_id}, True)
return result["data"]["work"]
[docs]
def get_is_work_alive(self, work_id: str) -> bool:
"""Check if a work is alive.
:param work_id: the work id
:type work_id: str
:return: whether the work is alive
:rtype: bool
"""
query = """
query WorkAliveQuery($id: ID!) {
isWorkAlive(id: $id)
}
"""
result = self.api.query(query, {"id": work_id}, True)
return result["data"]["isWorkAlive"]
[docs]
def get_connector_works(self, connector_id: str) -> List[Dict]:
"""Get all works for a connector.
:param connector_id: the connector id
:type connector_id: str
:return: list of work dictionaries sorted by timestamp
:rtype: list[dict]
"""
query = """
query ConnectorWorksQuery(
$count: Int
$orderBy: WorksOrdering
$orderMode: OrderingMode
$filters: FilterGroup
) {
works(
first: $count
orderBy: $orderBy
orderMode: $orderMode
filters: $filters
) {
edges {
node {
id
name
user {
name
}
timestamp
status
event_source_id
received_time
processed_time
completed_time
tracking {
import_expected_number
import_processed_number
}
messages {
timestamp
message
sequence
source
}
errors {
timestamp
message
sequence
source
}
}
}
}
}
"""
result = self.api.query(
query,
{
"count": 50,
"filters": {
"mode": "and",
"filters": [{"key": "connector_id", "values": [connector_id]}],
"filterGroups": [],
},
},
True,
)
result = result["data"]["works"]["edges"]
return_value = []
for node in result:
node = node["node"]
return_value.append(node)
return sorted(return_value, key=lambda i: i["timestamp"])