# coding: utf-8
import json
import uuid
from stix2.canonicalization.Canonicalize import canonicalize
from .indicator.opencti_indicator_properties import (
INDICATOR_PROPERTIES,
INDICATOR_PROPERTIES_WITH_FILES,
)
[docs]
class Indicator:
"""Main Indicator class for OpenCTI
Manages threat indicators and detection patterns in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the Indicator instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = INDICATOR_PROPERTIES
[docs]
self.properties_with_files = INDICATOR_PROPERTIES_WITH_FILES
@staticmethod
[docs]
def generate_id(pattern):
"""Generate a STIX ID for an Indicator.
:param pattern: The STIX pattern
:type pattern: str
:return: STIX ID for the indicator
:rtype: str
"""
data = {"pattern": pattern.strip()}
data = canonicalize(data, utf8=False)
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
return "indicator--" + id
@staticmethod
[docs]
def generate_id_from_data(data):
"""Generate a STIX ID from indicator data.
:param data: Dictionary containing 'pattern' key
:type data: dict
:return: STIX ID for the indicator
:rtype: str
"""
return Indicator.generate_id(data["pattern"])
[docs]
def list(self, **kwargs):
"""List Indicator objects.
:param filters: (optional) the filters to apply
:type filters: dict
:param search: (optional) a search keyword to apply for the listing
:type search: str
:param first: (optional) return the first n rows from the `after` ID or the beginning if not set
:type first: int
:param after: (optional) OpenCTI object ID of the first row for pagination
:type after: str
:param orderBy: (optional) the field to order the response on
:type orderBy: str
:param orderMode: (optional) either "asc" or "desc"
:type orderMode: str
:param customAttributes: (optional) list of attributes keys to return
:type customAttributes: str
:param getAll: (optional) switch to return all entries (be careful to use this without any other filters)
:type getAll: bool
:param withPagination: (optional) switch to use pagination
:type withPagination: bool
:param withFiles: (optional) include files in response
:type withFiles: bool
:param toStix: (optional) get in STIX format
:type toStix: bool
:return: List of Indicators
:rtype: list
"""
filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 500)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
with_files = kwargs.get("withFiles", False)
to_stix = kwargs.get("toStix", False)
self.opencti.app_logger.info(
"Listing Indicators with filters", {"filters": json.dumps(filters)}
)
query = (
"""
query Indicators($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: IndicatorsOrdering, $orderMode: OrderingMode, $toStix: Boolean) {
indicators(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, toStix: $toStix) {
edges {
node {
"""
+ (
"toStix"
if to_stix
else (
custom_attributes
if custom_attributes is not None
else (self.properties_with_files if with_files else self.properties)
)
)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
"toStix": to_stix,
},
)
if get_all:
final_data = []
data = self.opencti.process_multiple(result["data"]["indicators"])
final_data = final_data + data
while result["data"]["indicators"]["pageInfo"]["hasNextPage"]:
after = result["data"]["indicators"]["pageInfo"]["endCursor"]
self.opencti.app_logger.debug("Listing Indicators", {"after": after})
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
"toStix": to_stix,
},
)
data = self.opencti.process_multiple(result["data"]["indicators"])
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["indicators"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read an Indicator object.
Read can be either used with a known OpenCTI entity `id` or by using a
valid filter to search and return a single Indicator entity or None.
Note: either `id` or `filters` is required.
:param id: the id of the Indicator
:type id: str
:param filters: the filters to apply if no id provided
:type filters: dict
:param customAttributes: custom attributes to return
:type customAttributes: str
:param withFiles: whether to include files
:type withFiles: bool
:return: Indicator object
:rtype: dict or None
"""
id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
with_files = kwargs.get("withFiles", False)
if id is not None:
self.opencti.app_logger.info("Reading Indicator", {"id": id})
query = (
"""
query Indicator($id: String!) {
indicator(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else (self.properties_with_files if with_files else self.properties)
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(result["data"]["indicator"])
elif filters is not None:
result = self.list(filters=filters, customAttributes=custom_attributes)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_indicator] Missing parameters: id or filters"
)
return None
[docs]
def create(self, **kwargs):
"""Create an Indicator object.
:param stix_id: (optional) the STIX ID
:type stix_id: str
:param createdBy: (optional) the author ID
:type createdBy: str
:param objectMarking: (optional) list of marking definition IDs
:type objectMarking: list
:param objectLabel: (optional) list of label IDs
:type objectLabel: list
:param externalReferences: (optional) list of external reference IDs
:type externalReferences: list
:param revoked: (optional) whether the indicator is revoked
:type revoked: bool
:param confidence: (optional) confidence level (0-100)
:type confidence: int
:param lang: (optional) language
:type lang: str
:param created: (optional) creation date
:type created: str
:param modified: (optional) modification date
:type modified: str
:param pattern_type: the pattern type (required)
:type pattern_type: str
:param pattern_version: (optional) the pattern version
:type pattern_version: str
:param pattern: the indicator pattern (required)
:type pattern: str
:param name: the name of the Indicator (defaults to pattern)
:type name: str
:param description: (optional) description
:type description: str
:param indicator_types: (optional) list of indicator types
:type indicator_types: list
:param valid_from: (optional) valid from date
:type valid_from: str
:param valid_until: (optional) valid until date
:type valid_until: str
:param x_opencti_score: (optional) score (default: 50)
:type x_opencti_score: int
:param x_opencti_detection: (optional) detection flag (default: False)
:type x_opencti_detection: bool
:param x_opencti_main_observable_type: the main observable type (required)
:type x_opencti_main_observable_type: str
:param x_mitre_platforms: (optional) list of MITRE platforms
:type x_mitre_platforms: list
:param killChainPhases: (optional) list of kill chain phase IDs
:type killChainPhases: list
:param x_opencti_stix_ids: (optional) list of additional STIX IDs
:type x_opencti_stix_ids: list
:param x_opencti_create_observables: (optional) create observables (default: False)
:type x_opencti_create_observables: bool
:param objectOrganization: (optional) list of organization IDs
:type objectOrganization: list
:param x_opencti_workflow_id: (optional) workflow ID
:type x_opencti_workflow_id: str
:param x_opencti_modified_at: (optional) custom modification date
:type x_opencti_modified_at: str
:param update: (optional) whether to update if exists (default: False)
:type update: bool
:param files: (optional) list of File objects to attach
:type files: list
:param filesMarkings: (optional) list of lists of marking definition IDs for each file
:type filesMarkings: list
:return: Indicator object
:rtype: dict or None
"""
stix_id = kwargs.get("stix_id", None)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
external_references = kwargs.get("externalReferences", None)
revoked = kwargs.get("revoked", None)
confidence = kwargs.get("confidence", None)
lang = kwargs.get("lang", None)
created = kwargs.get("created", None)
modified = kwargs.get("modified", None)
pattern_type = kwargs.get("pattern_type", None)
pattern_version = kwargs.get("pattern_version", None)
pattern = kwargs.get("pattern", None)
name = kwargs.get("name", kwargs.get("pattern", None))
description = kwargs.get("description", None)
indicator_types = kwargs.get("indicator_types", None)
valid_from = kwargs.get("valid_from", None)
valid_until = kwargs.get("valid_until", None)
x_opencti_score = kwargs.get("x_opencti_score", 50)
x_opencti_detection = kwargs.get("x_opencti_detection", False)
x_opencti_main_observable_type = kwargs.get(
"x_opencti_main_observable_type", None
)
x_mitre_platforms = kwargs.get("x_mitre_platforms", None)
kill_chain_phases = kwargs.get("killChainPhases", None)
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
create_observables = kwargs.get("x_opencti_create_observables", False)
granted_refs = kwargs.get("objectOrganization", None)
x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None)
x_opencti_modified_at = kwargs.get("x_opencti_modified_at", None)
update = kwargs.get("update", False)
files = kwargs.get("files", None)
files_markings = kwargs.get("filesMarkings", None)
no_trigger_import = kwargs.get("noTriggerImport", None)
embedded = kwargs.get("embedded", None)
upsert_operations = kwargs.get("upsert_operations", None)
if (
name is not None
and pattern is not None
and pattern_type is not None
and x_opencti_main_observable_type is not None
):
if x_opencti_main_observable_type == "File":
x_opencti_main_observable_type = "StixFile"
self.opencti.app_logger.info("Creating Indicator", {"name": name})
query = """
mutation IndicatorAdd($input: IndicatorAddInput!) {
indicatorAdd(input: $input) {
id
standard_id
entity_type
parent_types
observables {
edges {
node {
id
standard_id
entity_type
}
}
}
}
}
"""
result = self.opencti.query(
query,
{
"input": {
"stix_id": stix_id,
"createdBy": created_by,
"objectMarking": object_marking,
"objectLabel": object_label,
"objectOrganization": granted_refs,
"externalReferences": external_references,
"revoked": revoked,
"confidence": confidence,
"lang": lang,
"created": created,
"modified": modified,
"pattern_type": pattern_type,
"pattern_version": pattern_version,
"pattern": pattern,
"name": name,
"description": description,
"indicator_types": indicator_types,
"valid_until": valid_until,
"valid_from": valid_from,
"x_opencti_score": x_opencti_score,
"x_opencti_detection": x_opencti_detection,
"x_opencti_main_observable_type": x_opencti_main_observable_type,
"x_mitre_platforms": x_mitre_platforms,
"x_opencti_stix_ids": x_opencti_stix_ids,
"killChainPhases": kill_chain_phases,
"createObservables": create_observables,
"x_opencti_workflow_id": x_opencti_workflow_id,
"x_opencti_modified_at": x_opencti_modified_at,
"update": update,
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
"upsertOperations": upsert_operations,
}
},
)
return self.opencti.process_multiple_fields(result["data"]["indicatorAdd"])
else:
self.opencti.app_logger.error(
"[opencti_indicator] Missing parameters: "
"name or pattern or pattern_type or x_opencti_main_observable_type"
)
return None
[docs]
def update_field(self, **kwargs):
"""Update an Indicator object field.
:param id: the Indicator id
:type id: str
:param input: the input of the field
:type input: list
:return: Updated indicator object
:rtype: dict or None
"""
id = kwargs.get("id", None)
input = kwargs.get("input", None)
if id is not None and input is not None:
self.opencti.app_logger.info("Updating Indicator", {"id": id})
query = """
mutation IndicatorFieldPatch($id: ID!, $input: [EditInput!]!) {
indicatorFieldPatch(id: $id, input: $input) {
id
standard_id
entity_type
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"input": input,
},
)
return self.opencti.process_multiple_fields(
result["data"]["indicatorFieldPatch"]
)
else:
self.opencti.app_logger.error(
"[opencti_indicator] Cannot update indicator field, missing parameters: id and input"
)
return None
[docs]
def add_stix_cyber_observable(self, **kwargs):
"""Add a Stix-Cyber-Observable object to Indicator object (based-on).
:param id: the id of the Indicator
:type id: str
:param indicator: Indicator object
:type indicator: dict
:param stix_cyber_observable_id: the id of the Stix-Observable
:type stix_cyber_observable_id: str
:return: True if there has been no import error
:rtype: bool
"""
id = kwargs.get("id", None)
indicator = kwargs.get("indicator", None)
stix_cyber_observable_id = kwargs.get("stix_cyber_observable_id", None)
if id is not None and stix_cyber_observable_id is not None:
if indicator is None:
indicator = self.read(id=id)
if indicator is None:
self.opencti.app_logger.error(
"[opencti_indicator] Cannot add Object Ref, indicator not found"
)
return False
if stix_cyber_observable_id in indicator["observablesIds"]:
return True
else:
self.opencti.app_logger.info(
"Adding Stix-Observable to Indicator",
{"observable": stix_cyber_observable_id, "indicator": id},
)
query = """
mutation StixCoreRelationshipAdd($input: StixCoreRelationshipAddInput!) {
stixCoreRelationshipAdd(input: $input) {
id
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"fromId": id,
"toId": stix_cyber_observable_id,
"relationship_type": "based-on",
},
},
)
return True
else:
self.opencti.app_logger.error(
"[opencti_indicator] Missing parameters: id and stix cyber_observable_id"
)
return False
[docs]
def import_from_stix2(self, **kwargs):
"""Import an Indicator object from a STIX2 object.
:param stixObject: the Stix-Object Indicator
:type stixObject: dict
:param extras: extra dict
:type extras: dict
:param update: set the update flag on import
:type update: bool
:return: Indicator object
:rtype: dict or None
"""
stix_object = kwargs.get("stixObject", None)
extras = kwargs.get("extras", {})
update = kwargs.get("update", False)
if stix_object is not None:
# Search in extensions
if "x_opencti_score" not in stix_object:
stix_object["x_opencti_score"] = (
self.opencti.get_attribute_in_extension("score", stix_object)
)
if "x_opencti_detection" not in stix_object:
stix_object["x_opencti_detection"] = (
self.opencti.get_attribute_in_extension("detection", stix_object)
)
if (
"x_opencti_main_observable_type" not in stix_object
and self.opencti.get_attribute_in_extension(
"main_observable_type", stix_object
)
is not None
):
stix_object["x_opencti_main_observable_type"] = (
self.opencti.get_attribute_in_extension(
"main_observable_type", stix_object
)
)
if "x_opencti_create_observables" not in stix_object:
stix_object["x_opencti_create_observables"] = (
self.opencti.get_attribute_in_extension(
"create_observables", stix_object
)
)
if "x_opencti_stix_ids" not in stix_object:
stix_object["x_opencti_stix_ids"] = (
self.opencti.get_attribute_in_extension("stix_ids", stix_object)
)
if "x_opencti_granted_refs" not in stix_object:
stix_object["x_opencti_granted_refs"] = (
self.opencti.get_attribute_in_extension("granted_refs", stix_object)
)
if "x_opencti_workflow_id" not in stix_object:
stix_object["x_opencti_workflow_id"] = (
self.opencti.get_attribute_in_extension("workflow_id", stix_object)
)
if "x_mitre_platforms" not in stix_object:
stix_object["x_mitre_platforms"] = (
self.opencti.get_attribute_in_mitre_extension(
"platforms", stix_object
)
)
if "x_opencti_modified_at" not in stix_object:
stix_object["x_opencti_modified_at"] = (
self.opencti.get_attribute_in_extension("modified_at", stix_object)
)
if "opencti_upsert_operations" not in stix_object:
stix_object["opencti_upsert_operations"] = (
self.opencti.get_attribute_in_extension(
"opencti_upsert_operations", stix_object
)
)
return self.create(
stix_id=stix_object["id"],
createdBy=(
extras["created_by_id"] if "created_by_id" in extras else None
),
objectMarking=(
extras["object_marking_ids"]
if "object_marking_ids" in extras
else None
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
revoked=stix_object["revoked"] if "revoked" in stix_object else None,
confidence=(
stix_object["confidence"] if "confidence" in stix_object else None
),
lang=stix_object["lang"] if "lang" in stix_object else None,
created=stix_object["created"] if "created" in stix_object else None,
modified=stix_object["modified"] if "modified" in stix_object else None,
pattern_type=(
stix_object["pattern_type"]
if "pattern_type" in stix_object
else None
),
pattern_version=(
stix_object["pattern_version"]
if "pattern_version" in stix_object
else None
),
pattern=stix_object["pattern"] if "pattern" in stix_object else "",
name=(
stix_object["name"]
if "name" in stix_object
else stix_object["pattern"]
),
description=(
self.opencti.stix2.convert_markdown(stix_object["description"])
if "description" in stix_object
else None
),
indicator_types=(
stix_object["indicator_types"]
if "indicator_types" in stix_object
else None
),
valid_from=(
stix_object["valid_from"] if "valid_from" in stix_object else None
),
valid_until=(
stix_object["valid_until"] if "valid_until" in stix_object else None
),
x_opencti_score=(
stix_object["x_opencti_score"]
if "x_opencti_score" in stix_object
else 50
),
x_opencti_detection=(
stix_object["x_opencti_detection"]
if "x_opencti_detection" in stix_object
else False
),
x_mitre_platforms=(
stix_object["x_mitre_platforms"]
if "x_mitre_platforms" in stix_object
else None
),
x_opencti_main_observable_type=(
stix_object["x_opencti_main_observable_type"]
if "x_opencti_main_observable_type" in stix_object
else "Unknown"
),
killChainPhases=(
extras["kill_chain_phases_ids"]
if "kill_chain_phases_ids" in extras
else None
),
x_opencti_stix_ids=(
stix_object["x_opencti_stix_ids"]
if "x_opencti_stix_ids" in stix_object
else None
),
x_opencti_create_observables=(
stix_object["x_opencti_create_observables"]
if "x_opencti_create_observables" in stix_object
else False
),
objectOrganization=(
stix_object["x_opencti_granted_refs"]
if "x_opencti_granted_refs" in stix_object
else None
),
x_opencti_workflow_id=(
stix_object["x_opencti_workflow_id"]
if "x_opencti_workflow_id" in stix_object
else None
),
x_opencti_modified_at=(
stix_object["x_opencti_modified_at"]
if "x_opencti_modified_at" in stix_object
else None
),
update=update,
files=extras.get("files"),
filesMarkings=extras.get("filesMarkings"),
noTriggerImport=extras.get("noTriggerImport", None),
embedded=extras.get("embedded", None),
upsert_operations=(
stix_object["opencti_upsert_operations"]
if "opencti_upsert_operations" in stix_object
else None
),
)
else:
self.opencti.app_logger.error(
"[opencti_indicator] Missing parameters: stixObject"
)
return None