# coding: utf-8
import base64
import json
import os
import magic
from .indicator.opencti_indicator_properties import INDICATOR_PROPERTIES
from .stix_cyber_observable.opencti_stix_cyber_observable_deprecated import (
StixCyberObservableDeprecatedMixin,
)
from .stix_cyber_observable.opencti_stix_cyber_observable_properties import (
SCO_PROPERTIES,
SCO_PROPERTIES_WITH_FILES,
)
[docs]
class StixCyberObservable(StixCyberObservableDeprecatedMixin):
"""Main StixCyberObservable class for OpenCTI
Manages STIX cyber observables (indicators of compromise) in the OpenCTI platform.
Note: Deprecated methods are available through StixCyberObservableDeprecatedMixin.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the StixCyberObservable instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = SCO_PROPERTIES
[docs]
self.properties_with_files = SCO_PROPERTIES_WITH_FILES
[docs]
def list(self, **kwargs):
"""List StixCyberObservable objects.
:param types: the array of types
:type types: list
:param filters: the filters to apply
:type filters: dict
:param search: the search keyword
:type search: str
:param first: return the first n rows from the after ID (or the beginning if not set)
:type first: int
:param after: ID of the first row for pagination
:type after: str
:param orderBy: field to order results by
:type orderBy: str
:param orderMode: ordering mode (asc/desc)
:type orderMode: str
:param customAttributes: custom attributes to return
:type customAttributes: str
:param getAll: whether to retrieve all results
:type getAll: bool
:param withPagination: whether to include pagination info
:type withPagination: bool
:param withFiles: whether to include files
:type withFiles: bool
:return: List of StixCyberObservable objects
:rtype: list
"""
types = kwargs.get("types", None)
filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 100)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
with_files = kwargs.get("withFiles", False)
self.opencti.app_logger.info(
"Listing StixCyberObservables with filters",
{"filters": json.dumps(filters)},
)
query = (
"""
query StixCyberObservables($types: [String], $filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: StixCyberObservablesOrdering, $orderMode: OrderingMode) {
stixCyberObservables(types: $types, filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
edges {
node {
"""
+ (
custom_attributes
if custom_attributes is not None
else (self.properties_with_files if with_files else self.properties)
)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"types": types,
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
if get_all:
final_data = []
data = self.opencti.process_multiple(result["data"]["stixCyberObservables"])
final_data = final_data + data
while result["data"]["stixCyberObservables"]["pageInfo"]["hasNextPage"]:
after = result["data"]["stixCyberObservables"]["pageInfo"]["endCursor"]
self.opencti.app_logger.info(
"Listing StixCyberObservables", {"after": after}
)
result = self.opencti.query(
query,
{
"types": types,
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
data = self.opencti.process_multiple(
result["data"]["stixCyberObservables"]
)
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["stixCyberObservables"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read a StixCyberObservable object.
:param id: the id of the StixCyberObservable
:type id: str
:param filters: the filters to apply if no id provided
:type filters: dict
:param customAttributes: custom attributes to return
:type customAttributes: str
:param withFiles: whether to include files
:type withFiles: bool
:return: StixCyberObservable object
:rtype: dict or None
"""
id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
with_files = kwargs.get("withFiles", False)
if id is not None:
self.opencti.app_logger.info("Reading StixCyberObservable", {"id": id})
query = (
"""
query StixCyberObservable($id: String!) {
stixCyberObservable(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else (self.properties_with_files if with_files else self.properties)
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(
result["data"]["stixCyberObservable"]
)
elif filters is not None:
result = self.list(filters=filters, customAttributes=custom_attributes)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_stix_cyber_observable] Missing parameters: id or filters"
)
return None
[docs]
def add_file(self, **kwargs):
"""Upload a file in this Observable.
:param id: the Stix-Cyber-Observable id
:type id: str
:param file_name: name of the file to upload
:type file_name: str
:param data: the file data
:type data: bytes or str
:param fileMarkings: list of marking definition IDs for the file
:type fileMarkings: list
:param version: file version
:type version: str
:param mime_type: MIME type of the file (default: text/plain)
:type mime_type: str
:param no_trigger_import: whether to skip import trigger
:type no_trigger_import: bool
:param embedded: whether the file is embedded
:type embedded: bool
:return: updated StixCyberObservable object
:rtype: dict or None
"""
id = kwargs.get("id", None)
file_name = kwargs.get("file_name", None)
data = kwargs.get("data", None)
file_markings = kwargs.get("fileMarkings", None)
version = kwargs.get("version", None)
mime_type = kwargs.get("mime_type", "text/plain")
no_trigger_import = kwargs.get("no_trigger_import", False)
embedded = kwargs.get("embedded", False)
if id is not None and file_name is not None:
final_file_name = os.path.basename(file_name)
query = """
mutation StixCyberObservableEdit($id: ID!, $file: Upload!, $fileMarkings: [String], $version: DateTime, $noTriggerImport: Boolean, $embedded: Boolean) {
stixCyberObservableEdit(id: $id) {
importPush(file: $file, version: $version, fileMarkings: $fileMarkings, noTriggerImport: $noTriggerImport, embedded: $embedded) {
id
name
}
}
}
"""
if data is None:
data = open(file_name, "rb")
if file_name.endswith(".json"):
mime_type = "application/json"
else:
mime_type = magic.from_file(file_name, mime=True)
self.opencti.app_logger.info(
"Uploading a file in Stix-Cyber-Observable",
{"file": final_file_name, "id": id},
)
return self.opencti.query(
query,
{
"id": id,
"file": (self.opencti.file(final_file_name, data, mime_type)),
"fileMarkings": file_markings,
"version": version,
"noTriggerImport": (
no_trigger_import
if isinstance(no_trigger_import, bool)
else no_trigger_import == "True"
),
"embedded": embedded,
},
)
else:
self.opencti.app_logger.error(
"[opencti_stix_cyber_observable] Missing parameters: id or file_name"
)
return None
[docs]
def create(self, **kwargs):
"""Create a Stix-Cyber-Observable object.
:param observableData: the data of the observable (STIX2 structure)
:type observableData: dict
:param simple_observable_id: (optional) simple observable STIX ID
:type simple_observable_id: str
:param simple_observable_key: (optional) simple observable key (e.g., "IPv4-Addr.value")
:type simple_observable_key: str
:param simple_observable_value: (optional) simple observable value
:type simple_observable_value: str
:param simple_observable_description: (optional) simple observable description
:type simple_observable_description: str
:param x_opencti_score: (optional) score (0-100)
:type x_opencti_score: int
:param createdBy: (optional) the author ID
:type createdBy: str
:param objectMarking: (optional) list of marking definition IDs
:type objectMarking: list
:param objectLabel: (optional) list of label IDs
:type objectLabel: list
:param externalReferences: (optional) list of external reference IDs
:type externalReferences: list
:param objectOrganization: (optional) list of organization IDs
:type objectOrganization: list
:param update: (optional) whether to update if exists (default: False)
:type update: bool
:param resolve_result_indicators: (optional) resolve result indicators (default: True)
:type resolve_result_indicators: bool
:param files: (optional) list of File objects to attach
:type files: list
:param filesMarkings: (optional) list of lists of marking definition IDs for each file
:type filesMarkings: list
:return: Stix-Cyber-Observable object
:rtype: dict or None
"""
observable_data = kwargs.get("observableData", {})
simple_observable_id = kwargs.get("simple_observable_id", None)
simple_observable_key = kwargs.get("simple_observable_key", None)
simple_observable_value = kwargs.get("simple_observable_value", None)
simple_observable_description = kwargs.get(
"simple_observable_description", None
)
x_opencti_score = kwargs.get("x_opencti_score", None)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
external_references = kwargs.get("externalReferences", None)
granted_refs = kwargs.get("objectOrganization", None)
update = kwargs.get("update", False)
resolve_result_indicators = kwargs.get("resolve_result_indicators", True)
files = kwargs.get("files", None)
files_markings = kwargs.get("filesMarkings", None)
no_trigger_import = kwargs.get("noTriggerImport", None)
embedded = kwargs.get("embedded", None)
upsert_operations = kwargs.get("upsert_operations", None)
create_indicator = (
observable_data["x_opencti_create_indicator"]
if "x_opencti_create_indicator" in observable_data
else kwargs.get("createIndicator", False)
)
attribute = None
if simple_observable_key is not None:
key_split = simple_observable_key.split(".")
type = key_split[0].title()
attribute = key_split[1]
if attribute not in ["hashes", "extensions"]:
observable_data[attribute] = simple_observable_value
else:
type = (
observable_data["type"].title() if "type" in observable_data else None
)
if type is None:
return
if type.lower() == "file":
type = "StixFile"
elif type.lower() == "ipv4-addr":
type = "IPv4-Addr"
elif type.lower() == "ipv6-addr":
type = "IPv6-Addr"
elif type.lower() == "persona":
type = "Persona"
elif type.lower() == "ssh-key":
type = "SSH-Key"
elif type.lower() == "ai-prompt":
type = "AI-Prompt"
elif type.lower() == "hostname" or type.lower() == "x-opencti-hostname":
type = "Hostname"
elif type.lower() == "payment-card" or type.lower() == "x-opencti-payment-card":
type = "Payment-Card"
elif type.lower() == "credential" or type.lower() == "x-opencti-credential":
type = "Credential"
elif (
type.lower() == "tracking-number"
or type.lower() == "x-opencti-tracking-number"
):
type = "Tracking-Number"
elif (
type.lower() == "cryptocurrency-wallet"
or type.lower() == "x-opencti-cryptocurrency-wallet"
):
type = "Cryptocurrency-Wallet"
elif type.lower() == "user-agent" or type.lower() == "x-opencti-user-agent":
type = "User-Agent"
elif (
type.lower() == "cryptographic-key"
or type.lower() == "x-opencti-cryptographic-key"
):
type = "Cryptographic-Key"
elif type.lower() == "imei" or type.lower() == "x-opencti-imei":
type = "IMEI"
elif type.lower() == "iccid" or type.lower() == "x-opencti-iccid":
type = "ICCID"
elif type.lower() == "imsi" or type.lower() == "x-opencti-imsi":
type = "IMSI"
elif type.lower() == "text" or type.lower() == "x-opencti-text":
type = "Text"
if "x_opencti_description" in observable_data:
x_opencti_description = observable_data["x_opencti_description"]
else:
x_opencti_description = self.opencti.get_attribute_in_extension(
"description", observable_data
)
if simple_observable_description is not None:
x_opencti_description = simple_observable_description
if "x_opencti_score" in observable_data:
x_opencti_score = observable_data["x_opencti_score"]
elif (
self.opencti.get_attribute_in_extension("score", observable_data)
is not None
):
x_opencti_score = self.opencti.get_attribute_in_extension(
"score", observable_data
)
stix_id = observable_data["id"] if "id" in observable_data else None
if simple_observable_id is not None:
stix_id = simple_observable_id
hashes = []
if (
simple_observable_key is not None
and simple_observable_key.lower() == "file.hashes.md5"
):
hashes.append({"algorithm": "MD5", "hash": simple_observable_value})
if (
simple_observable_key is not None
and simple_observable_key.lower() == "file.hashes.sha-1"
):
hashes.append({"algorithm": "SHA-1", "hash": simple_observable_value})
if (
simple_observable_key is not None
and simple_observable_key.lower() == "file.hashes.sha-256"
):
hashes.append({"algorithm": "SHA-256", "hash": simple_observable_value})
if "hashes" in observable_data:
for key, value in observable_data["hashes"].items():
hashes.append({"algorithm": key, "hash": value})
if type is not None:
self.opencti.app_logger.info(
"Creating Stix-Cyber-Observable",
{"type": type, "create_indicator": create_indicator},
)
input_variables = {
"type": type,
"stix_id": stix_id,
"x_opencti_score": x_opencti_score,
"x_opencti_description": x_opencti_description,
"createIndicator": create_indicator,
"createdBy": created_by,
"objectMarking": object_marking,
"objectOrganization": granted_refs,
"objectLabel": object_label,
"externalReferences": external_references,
"update": update,
"upsertOperations": upsert_operations,
}
query = (
"""
mutation StixCyberObservableAdd(
$type: String!,
$stix_id: StixId,
$x_opencti_score: Int,
$x_opencti_description: String,
$createIndicator: Boolean,
$createdBy: String,
$objectMarking: [String],
$objectLabel: [String],
$objectOrganization: [String],
$externalReferences: [String],
$update: Boolean,
$upsertOperations: [EditInput!],
$AutonomousSystem: AutonomousSystemAddInput,
$Directory: DirectoryAddInput,
$DomainName: DomainNameAddInput,
$EmailAddr: EmailAddrAddInput,
$EmailMessage: EmailMessageAddInput,
$EmailMimePartType: EmailMimePartTypeAddInput,
$Artifact: ArtifactAddInput,
$StixFile: StixFileAddInput,
$X509Certificate: X509CertificateAddInput,
$IPv4Addr: IPv4AddrAddInput,
$IPv6Addr: IPv6AddrAddInput,
$MacAddr: MacAddrAddInput,
$Mutex: MutexAddInput,
$NetworkTraffic: NetworkTrafficAddInput,
$Process: ProcessAddInput,
$Software: SoftwareAddInput,
$Url: UrlAddInput,
$UserAccount: UserAccountAddInput,
$WindowsRegistryKey: WindowsRegistryKeyAddInput,
$WindowsRegistryValueType: WindowsRegistryValueTypeAddInput,
$CryptographicKey: CryptographicKeyAddInput,
$CryptocurrencyWallet: CryptocurrencyWalletAddInput,
$Hostname: HostnameAddInput
$Text: TextAddInput,
$UserAgent: UserAgentAddInput
$BankAccount: BankAccountAddInput
$PhoneNumber: PhoneNumberAddInput
$Credential: CredentialAddInput
$TrackingNumber: TrackingNumberAddInput
$PaymentCard: PaymentCardAddInput
$Persona: PersonaAddInput
$MediaContent: MediaContentAddInput
$SSHKey: SSHKeyAddInput
$AIPrompt: AIPromptAddInput
$IMEI: IMEIAddInput
$ICCID: ICCIDAddInput
$IMSI: IMSIAddInput
) {
stixCyberObservableAdd(
type: $type,
stix_id: $stix_id,
x_opencti_score: $x_opencti_score,
x_opencti_description: $x_opencti_description,
createIndicator: $createIndicator,
createdBy: $createdBy,
objectMarking: $objectMarking,
objectLabel: $objectLabel,
update: $update,
upsertOperations: $upsertOperations,
externalReferences: $externalReferences,
objectOrganization: $objectOrganization,
AutonomousSystem: $AutonomousSystem,
Directory: $Directory,
DomainName: $DomainName,
EmailAddr: $EmailAddr,
EmailMessage: $EmailMessage,
EmailMimePartType: $EmailMimePartType,
Artifact: $Artifact,
StixFile: $StixFile,
X509Certificate: $X509Certificate,
IPv4Addr: $IPv4Addr,
IPv6Addr: $IPv6Addr,
MacAddr: $MacAddr,
Mutex: $Mutex,
NetworkTraffic: $NetworkTraffic,
Process: $Process,
Software: $Software,
Url: $Url,
UserAccount: $UserAccount,
WindowsRegistryKey: $WindowsRegistryKey,
WindowsRegistryValueType: $WindowsRegistryValueType,
CryptographicKey: $CryptographicKey,
CryptocurrencyWallet: $CryptocurrencyWallet,
Hostname: $Hostname,
Text: $Text,
UserAgent: $UserAgent
BankAccount: $BankAccount
PhoneNumber: $PhoneNumber
Credential: $Credential
TrackingNumber: $TrackingNumber
PaymentCard: $PaymentCard
Persona: $Persona
MediaContent: $MediaContent
SSHKey: $SSHKey
AIPrompt: $AIPrompt
IMEI: $IMEI
ICCID: $ICCID
IMSI: $IMSI
) {
id
standard_id
entity_type
parent_types
"""
+ (
"""
indicators {
edges {
node {
id
pattern
pattern_type
}
}
}
"""
if resolve_result_indicators
else ""
)
+ """
}
}
"""
)
if type == "Autonomous-System":
input_variables["AutonomousSystem"] = {
"number": observable_data["number"],
"name": (
observable_data["name"] if "name" in observable_data else None
),
"rir": observable_data["rir"] if "rir" in observable_data else None,
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Directory":
input_variables["Directory"] = {
"path": observable_data["path"],
"path_enc": (
observable_data["path_enc"]
if "path_enc" in observable_data
else None
),
"ctime": (
observable_data["ctime"] if "ctime" in observable_data else None
),
"mtime": (
observable_data["mtime"] if "mtime" in observable_data else None
),
"atime": (
observable_data["atime"] if "atime" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Domain-Name":
input_variables["DomainName"] = {
"value": observable_data["value"],
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
if attribute is not None:
input_variables["DomainName"][attribute] = simple_observable_value
elif type == "Email-Addr":
input_variables["EmailAddr"] = {
"value": observable_data["value"],
"display_name": (
observable_data["display_name"]
if "display_name" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Email-Message":
input_variables["EmailMessage"] = {
"is_multipart": (
observable_data["is_multipart"]
if "is_multipart" in observable_data
else None
),
"attribute_date": (
observable_data["date"] if "date" in observable_data else None
),
"message_id": (
observable_data["message_id"]
if "message_id" in observable_data
else None
),
"subject": (
observable_data["subject"]
if "subject" in observable_data
else None
),
"received_lines": (
observable_data["received_lines"]
if "received_lines" in observable_data
else None
),
"body": (
observable_data["body"] if "body" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Email-Mime-Part-Type":
input_variables["EmailMimePartType"] = {
"body": (
observable_data["body"] if "body" in observable_data else None
),
"content_type": (
observable_data["content_type"]
if "content_type" in observable_data
else None
),
"content_disposition": (
observable_data["content_disposition"]
if "content_disposition" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Artifact":
if (
"x_opencti_additional_names" not in observable_data
and self.opencti.get_attribute_in_extension(
"additional_names", observable_data
)
is not None
):
observable_data["x_opencti_additional_names"] = (
self.opencti.get_attribute_in_extension(
"additional_names", observable_data
)
)
input_variables["Artifact"] = {
"hashes": hashes if len(hashes) > 0 else None,
"mime_type": (
observable_data["mime_type"]
if "mime_type" in observable_data
else None
),
"url": observable_data["url"] if "url" in observable_data else None,
"encryption_algorithm": (
observable_data["encryption_algorithm"]
if "encryption_algorithm" in observable_data
else None
),
"decryption_key": (
observable_data["decryption_key"]
if "decryption_key" in observable_data
else None
),
"x_opencti_additional_names": (
observable_data["x_opencti_additional_names"]
if "x_opencti_additional_names" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "StixFile":
if (
"x_opencti_additional_names" not in observable_data
and self.opencti.get_attribute_in_extension(
"additional_names", observable_data
)
is not None
):
observable_data["x_opencti_additional_names"] = (
self.opencti.get_attribute_in_extension(
"additional_names", observable_data
)
)
input_variables["StixFile"] = {
"hashes": hashes if len(hashes) > 0 else None,
"size": (
observable_data["size"] if "size" in observable_data else None
),
"name": (
observable_data["name"] if "name" in observable_data else None
),
"name_enc": (
observable_data["name_enc"]
if "name_enc" in observable_data
else None
),
"magic_number_hex": (
observable_data["magic_number_hex"]
if "magic_number_hex" in observable_data
else None
),
"mime_type": (
observable_data["mime_type"]
if "mime_type" in observable_data
else None
),
"mtime": (
observable_data["mtime"] if "mtime" in observable_data else None
),
"ctime": (
observable_data["ctime"] if "ctime" in observable_data else None
),
"atime": (
observable_data["atime"] if "atime" in observable_data else None
),
"x_opencti_additional_names": (
observable_data["x_opencti_additional_names"]
if "x_opencti_additional_names" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "X509-Certificate":
input_variables["X509Certificate"] = {
"hashes": hashes if len(hashes) > 0 else None,
"is_self_signed": (
observable_data["is_self_signed"]
if "is_self_signed" in observable_data
else False
),
"version": (
observable_data["version"]
if "version" in observable_data
else None
),
"serial_number": (
observable_data["serial_number"]
if "serial_number" in observable_data
else None
),
"signature_algorithm": (
observable_data["signature_algorithm"]
if "signature_algorithm" in observable_data
else None
),
"issuer": (
observable_data["issuer"]
if "issuer" in observable_data
else None
),
"validity_not_before": (
observable_data["validity_not_before"]
if "validity_not_before" in observable_data
else None
),
"validity_not_after": (
observable_data["validity_not_after"]
if "validity_not_after" in observable_data
else None
),
"subject": (
observable_data["subject"]
if "subject" in observable_data
else None
),
"subject_public_key_algorithm": (
observable_data["subject_public_key_algorithm"]
if "subject_public_key_algorithm" in observable_data
else None
),
"subject_public_key_modulus": (
observable_data["subject_public_key_modulus"]
if "subject_public_key_modulus" in observable_data
else None
),
"subject_public_key_exponent": (
observable_data["subject_public_key_exponent"]
if "subject_public_key_exponent" in observable_data
else None
),
"basic_constraints": (
observable_data["basic_constraints"]
if "basic_constraints" in observable_data
else None
),
"name_constraints": (
observable_data["name_constraints"]
if "name_constraints" in observable_data
else None
),
"policy_constraints": (
observable_data["policy_constraints"]
if "policy_constraints" in observable_data
else None
),
"key_usage": (
observable_data["key_usage"]
if "key_usage" in observable_data
else None
),
"extended_key_usage": (
observable_data["extended_key_usage"]
if "extended_key_usage" in observable_data
else None
),
"subject_key_identifier": (
observable_data["subject_key_identifier"]
if "subject_key_identifier" in observable_data
else None
),
"authority_key_identifier": (
observable_data["authority_key_identifier"]
if "authority_key_identifier" in observable_data
else None
),
"subject_alternative_name": (
observable_data["subject_alternative_name"]
if "subject_alternative_name" in observable_data
else None
),
"issuer_alternative_name": (
observable_data["issuer_alternative_name"]
if "issuer_alternative_name" in observable_data
else None
),
"subject_directory_attributes": (
observable_data["subject_directory_attributes"]
if "subject_directory_attributes" in observable_data
else None
),
"crl_distribution_points": (
observable_data["crl_distribution_points"]
if "crl_distribution_points" in observable_data
else None
),
"inhibit_any_policy": (
observable_data["inhibit_any_policy"]
if "inhibit_any_policy" in observable_data
else None
),
"private_key_usage_period_not_before": (
observable_data["private_key_usage_period_not_before"]
if "private_key_usage_period_not_before" in observable_data
else None
),
"private_key_usage_period_not_after": (
observable_data["private_key_usage_period_not_after"]
if "private_key_usage_period_not_after" in observable_data
else None
),
"certificate_policies": (
observable_data["certificate_policies"]
if "certificate_policies" in observable_data
else None
),
"policy_mappings": (
observable_data["policy_mappings"]
if "policy_mappings" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "SSH-Key" or type.lower() == "ssh-key":
input_variables["SSHKey"] = {
"key_type": (
observable_data["key_type"]
if "key_type" in observable_data
else None
),
"public_key": (
observable_data["public_key"]
if "public_key" in observable_data
else None
),
"fingerprint_sha256": (
observable_data["fingerprint_sha256"]
if "fingerprint_sha256" in observable_data
else False
),
"fingerprint_md5": (
observable_data["fingerprint_md5"]
if "fingerprint_md5" in observable_data
else None
),
"key_length": (
observable_data["key_length"]
if "key_length" in observable_data
else None
),
"comment": (
observable_data["comment"]
if "comment" in observable_data
else None
),
"created": (
observable_data["created"]
if "created" in observable_data
else None
),
"expiration_date": (
observable_data["expiration_date"]
if "expiration_date" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "IPv4-Addr":
input_variables["IPv4Addr"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "IPv6-Addr":
input_variables["IPv6Addr"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Mac-Addr":
input_variables["MacAddr"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Mutex":
input_variables["Mutex"] = {
"name": (
observable_data["name"] if "name" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Network-Traffic":
input_variables["NetworkTraffic"] = {
"start": (
observable_data["start"] if "start" in observable_data else None
),
"end": observable_data["end"] if "end" in observable_data else None,
"is_active": (
observable_data["is_active"]
if "is_active" in observable_data
else None
),
"src_port": (
observable_data["src_port"]
if "src_port" in observable_data
else None
),
"dst_port": (
observable_data["dst_port"]
if "dst_port" in observable_data
else None
),
"networkSrc": (
observable_data["src_ref"]
if "src_ref" in observable_data
else None
),
"networkDst": (
observable_data["dst_ref"]
if "dst_ref" in observable_data
else None
),
"protocols": (
observable_data["protocols"]
if "protocols" in observable_data
else None
),
"src_byte_count": (
observable_data["src_byte_count"]
if "src_byte_count" in observable_data
else None
),
"dst_byte_count": (
observable_data["dst_byte_count"]
if "dst_byte_count" in observable_data
else None
),
"src_packets": (
observable_data["src_packets"]
if "src_packets" in observable_data
else None
),
"dst_packets": (
observable_data["dst_packets"]
if "dst_packets" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Process":
input_variables["Process"] = {
"is_hidden": (
observable_data["is_hidden"]
if "is_hidden" in observable_data
else None
),
"pid": observable_data["pid"] if "pid" in observable_data else None,
"created_time": (
observable_data["created_time"]
if "created_time" in observable_data
else None
),
"cwd": observable_data["cwd"] if "cwd" in observable_data else None,
"command_line": (
observable_data["command_line"]
if "command_line" in observable_data
else None
),
"environment_variables": (
observable_data["environment_variables"]
if "environment_variables" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Software":
if (
"x_opencti_product" not in observable_data
and self.opencti.get_attribute_in_extension(
"x_opencti_product", observable_data
)
is not None
):
observable_data["x_opencti_product"] = (
self.opencti.get_attribute_in_extension(
"x_opencti_product", observable_data
)
)
input_variables["Software"] = {
"name": (
observable_data["name"] if "name" in observable_data else None
),
"cpe": observable_data["cpe"] if "cpe" in observable_data else None,
"swid": (
observable_data["swid"] if "swid" in observable_data else None
),
"languages": (
observable_data["languages"]
if "languages" in observable_data
else None
),
"vendor": (
observable_data["vendor"]
if "vendor" in observable_data
else None
),
"version": (
observable_data["version"]
if "version" in observable_data
else None
),
"x_opencti_product": (
observable_data["x_opencti_product"]
if "x_opencti_product" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Url":
input_variables["Url"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "User-Account":
input_variables["UserAccount"] = {
"user_id": (
observable_data["user_id"]
if "user_id" in observable_data
else None
),
"credential": (
observable_data["credential"]
if "credential" in observable_data
else None
),
"account_login": (
observable_data["account_login"]
if "account_login" in observable_data
else None
),
"account_type": (
observable_data["account_type"]
if "account_type" in observable_data
else None
),
"display_name": (
observable_data["display_name"]
if "display_name" in observable_data
else None
),
"is_service_account": (
observable_data["is_service_account"]
if "is_service_account" in observable_data
else None
),
"is_privileged": (
observable_data["is_privileged"]
if "is_privileged" in observable_data
else None
),
"can_escalate_privs": (
observable_data["can_escalate_privs"]
if "can_escalate_privs" in observable_data
else None
),
"is_disabled": (
observable_data["is_disabled"]
if "is_disabled" in observable_data
else None
),
"account_created": (
observable_data["account_created"]
if "account_created" in observable_data
else None
),
"account_expires": (
observable_data["account_expires"]
if "account_expires" in observable_data
else None
),
"credential_last_changed": (
observable_data["credential_last_changed"]
if "credential_last_changed" in observable_data
else None
),
"account_first_login": (
observable_data["account_first_login"]
if "account_first_login" in observable_data
else None
),
"account_last_login": (
observable_data["account_last_login"]
if "account_last_login" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Windows-Registry-Key":
input_variables["WindowsRegistryKey"] = {
"attribute_key": (
observable_data["key"] if "key" in observable_data else None
),
"modified_time": (
observable_data["modified_time"]
if "modified_time" in observable_data
else None
),
"number_of_subkeys": (
observable_data["number_of_subkeys"]
if "number_of_subkeys" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Windows-Registry-Value-Type":
input_variables["WindowsRegistryValueType"] = {
"name": (
observable_data["name"] if "name" in observable_data else None
),
"data": (
observable_data["data"] if "data" in observable_data else None
),
"data_type": (
observable_data["data_type"]
if "data_type" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "User-Agent":
input_variables["UserAgent"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Cryptographic-Key":
input_variables["CryptographicKey"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Hostname":
input_variables["Hostname"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "AI-Prompt":
input_variables["AIPrompt"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
}
elif type == "Text":
input_variables["Text"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Bank-Account":
input_variables["BankAccount"] = {
"iban": (
observable_data["iban"] if "iban" in observable_data else None
),
"bic": observable_data["bic"] if "bic" in observable_data else None,
"account_number": (
observable_data["account_number"]
if "account_number" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Phone-Number":
input_variables["PhoneNumber"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Payment-Card":
input_variables["PaymentCard"] = {
"card_number": (
observable_data["card_number"]
if "card_number" in observable_data
else None
),
"expiration_date": (
observable_data["expiration_date"]
if "expiration_date" in observable_data
else None
),
"cvv": observable_data["cvv"] if "cvv" in observable_data else None,
"holder_name": (
observable_data["holder_name"]
if "holder_name" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Media-Content":
input_variables["MediaContent"] = {
"title": (
observable_data["title"] if "title" in observable_data else None
),
"content": (
observable_data["content"]
if "content" in observable_data
else None
),
"media_category": (
observable_data["media_category"]
if "media_category" in observable_data
else None
),
"url": observable_data["url"] if "url" in observable_data else None,
"publication_date": (
observable_data["publication_date"]
if "publication_date" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Persona":
input_variables["Persona"] = {
"persona_name": (
observable_data["persona_name"]
if "persona_name" in observable_data
else None
),
"persona_type": (
observable_data["persona_type"]
if "persona_type" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Payment-Card" or type.lower() == "x-opencti-payment-card":
input_variables["PaymentCard"] = {
"card_number": (
observable_data["card_number"]
if "card_number" in observable_data
else None
),
"expiration_date": (
observable_data["expiration_date"]
if "expiration_date" in observable_data
else None
),
"cvv": observable_data["cvv"] if "cvv" in observable_data else None,
"holder_name": (
observable_data["holder_name"]
if "holder_name" in observable_data
else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif (
type == "Cryptocurrency-Wallet"
or type.lower() == "x-opencti-cryptocurrency-wallet"
):
input_variables["CryptocurrencyWallet"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "Credential" or type.lower() == "x-opencti-credential":
input_variables["Credential"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif (
type == "Tracking-Number" or type.lower() == "x-opencti-tracking-number"
):
input_variables["TrackingNumber"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
elif type == "IMEI" or type.lower() == "x-opencti-imei":
input_variables["IMEI"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
}
elif type == "ICCID" or type.lower() == "x-opencti-iccid":
input_variables["ICCID"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
}
elif type == "IMSI" or type.lower() == "x-opencti-imsi":
input_variables["IMSI"] = {
"value": (
observable_data["value"] if "value" in observable_data else None
),
}
result = self.opencti.query(query, input_variables)
if "payload_bin" in observable_data and "mime_type" in observable_data:
self.add_file(
id=result["data"]["stixCyberObservableAdd"]["id"],
file_name=(
observable_data["x_opencti_additional_names"][0]
if "x_opencti_additional_names" in observable_data
and len(observable_data["x_opencti_additional_names"]) > 0
else "artifact.bin"
),
data=base64.b64decode(observable_data["payload_bin"]),
mime_type=observable_data["mime_type"],
)
return self.opencti.process_multiple_fields(
result["data"]["stixCyberObservableAdd"]
)
else:
self.opencti.app_logger.error("Missing parameters: type")
return None
[docs]
def upload_artifact(self, **kwargs):
"""Upload an artifact.
:param file_name: the file name or path
:type file_name: str
:param data: the file data (optional, reads from file_name if not provided)
:type data: bytes
:param mime_type: the MIME type (default: text/plain)
:type mime_type: str
:param x_opencti_description: description for the artifact
:type x_opencti_description: str
:param createdBy: the author ID
:type createdBy: str
:param objectMarking: list of marking definition IDs
:type objectMarking: list
:param objectLabel: list of label IDs
:type objectLabel: list
:param createIndicator: whether to create an indicator (default: False)
:type createIndicator: bool
:return: Stix-Observable object
:rtype: dict or None
"""
file_name = kwargs.get("file_name", None)
data = kwargs.get("data", None)
mime_type = kwargs.get("mime_type", "text/plain")
x_opencti_description = kwargs.get("x_opencti_description", False)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
create_indicator = kwargs.get("createIndicator", False)
if file_name is not None and mime_type is not None:
final_file_name = os.path.basename(file_name)
self.opencti.app_logger.info(
"Creating Stix-Cyber-Observable {artifact} with indicator",
{"create_indicator": create_indicator},
)
query = """
mutation ArtifactImport($file: Upload!, $x_opencti_description: String, $createdBy: String, $objectMarking: [String], $objectLabel: [String]) {
artifactImport(file: $file, x_opencti_description: $x_opencti_description, createdBy: $createdBy, objectMarking: $objectMarking, objectLabel: $objectLabel) {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
}
}
}
observable_value
x_opencti_description
x_opencti_score
indicators {
edges {
node {
id
pattern
pattern_type
}
}
}
mime_type
payload_bin
url
encryption_algorithm
decryption_key
hashes {
algorithm
hash
}
importFiles {
edges {
node {
id
name
size
}
}
}
}
}
"""
if data is None:
data = open(file_name, "rb")
if file_name.endswith(".json"):
mime_type = "application/json"
else:
mime_type = magic.from_file(file_name, mime=True)
result = self.opencti.query(
query,
{
"file": (self.opencti.file(final_file_name, data, mime_type)),
"x_opencti_description": x_opencti_description,
"createdBy": created_by,
"objectMarking": object_marking,
"objectLabel": object_label,
},
)
return self.opencti.process_multiple_fields(
result["data"]["artifactImport"]
)
else:
self.opencti.app_logger.error("Missing parameters: type")
return None
[docs]
def update_field(self, **kwargs):
"""Update a Stix-Observable object field.
:param id: the Stix-Observable id
:type id: str
:param input: the input of the field to update
:type input: list
:return: The updated Stix-Observable object
:rtype: dict or None
"""
id = kwargs.get("id", None)
input = kwargs.get("input", None)
if id is not None and input is not None:
self.opencti.app_logger.info("Updating Stix-Observable", {"id": id})
query = """
mutation StixCyberObservableEdit($id: ID!, $input: [EditInput]!) {
stixCyberObservableEdit(id: $id) {
fieldPatch(input: $input) {
id
standard_id
entity_type
}
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"input": input,
},
)
return self.opencti.process_multiple_fields(
result["data"]["stixCyberObservableEdit"]["fieldPatch"]
)
else:
self.opencti.app_logger.error(
"[opencti_stix_cyber_observable_update_field] Missing parameters: id and input",
)
return None
[docs]
def delete(self, **kwargs):
"""Delete a Stix-Observable.
:param id: the Stix-Observable id
:type id: str
:return: None
:rtype: None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info("Deleting Stix-Observable", {"id": id})
query = """
mutation StixCyberObservableEdit($id: ID!) {
stixCyberObservableEdit(id: $id) {
delete
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error(
"[opencti_stix_cyber_observable_delete] Missing parameters: id"
)
return None
[docs]
def update_created_by(self, **kwargs):
"""Update the Identity author of a Stix-Cyber-Observable object (created_by).
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param identity_id: the id of the Identity
:type identity_id: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
identity_id = kwargs.get("identity_id", None)
if id is not None:
self.opencti.app_logger.info(
"Updating author of Stix-Cyber-Observable with Identity",
{"id": id, "identity_id": identity_id},
)
custom_attributes = """
id
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
name
x_opencti_aliases
description
created
modified
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
"""
stix_domain_object = self.read(id=id, customAttributes=custom_attributes)
if stix_domain_object["createdBy"] is not None:
query = """
mutation StixCyberObservableEdit($id: ID!, $toId: StixRef! $relationship_type: String!) {
stixCyberObservableEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": stix_domain_object["createdBy"]["id"],
"relationship_type": "created-by",
},
)
if identity_id is not None:
# Add the new relation
query = """
mutation StixCyberObservableEdit($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCyberObservableEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
variables = {
"id": id,
"input": {
"toId": identity_id,
"relationship_type": "created-by",
},
}
self.opencti.query(query, variables)
else:
self.opencti.app_logger.error("Missing parameters: id")
return False
[docs]
def add_marking_definition(self, **kwargs):
"""Add a Marking-Definition object to Stix-Cyber-Observable object (object_marking_refs).
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param marking_definition_id: the id of the Marking-Definition
:type marking_definition_id: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
marking_definition_id = kwargs.get("marking_definition_id", None)
if id is not None and marking_definition_id is not None:
custom_attributes = """
id
objectMarking {
standard_id
entity_type
definition_type
definition
x_opencti_order
x_opencti_color
created
modified
}
"""
stix_cyber_observable = self.read(id=id, customAttributes=custom_attributes)
if stix_cyber_observable is None:
self.opencti.app_logger.error(
"Cannot add Marking-Definition, entity not found"
)
return False
if marking_definition_id in stix_cyber_observable["objectMarkingIds"]:
return True
else:
self.opencti.app_logger.info(
"Adding Marking-Definition to Stix-Cyber-Observable",
{"marking_definition_id": marking_definition_id, "id": id},
)
query = """
mutation StixCyberObservableAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCyberObservableEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": marking_definition_id,
"relationship_type": "object-marking",
},
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and marking_definition_id"
)
return False
[docs]
def remove_marking_definition(self, **kwargs):
"""Remove a Marking-Definition object from Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param marking_definition_id: the id of the Marking-Definition
:type marking_definition_id: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
marking_definition_id = kwargs.get("marking_definition_id", None)
if id is not None and marking_definition_id is not None:
self.opencti.app_logger.info(
"Removing Marking-Definition from Stix-Cyber-Observable",
{"marking_definition_id": marking_definition_id, "id": id},
)
query = """
mutation StixCyberObservableRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCyberObservableEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": marking_definition_id,
"relationship_type": "object-marking",
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and marking_definition_id"
)
return False
[docs]
def add_label(self, **kwargs):
"""Add a Label object to Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param label_id: the id of the Label
:type label_id: str
:param label_name: the name of the Label (will create if not exists)
:type label_name: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
label_id = kwargs.get("label_id", None)
label_name = kwargs.get("label_name", None)
if label_name is not None:
label = self.opencti.label.read(
filters={
"mode": "and",
"filters": [{"key": "value", "values": [label_name]}],
"filterGroups": [],
}
)
if label:
label_id = label["id"]
else:
label = self.opencti.label.create(value=label_name)
label_id = label["id"]
if id is not None and label_id is not None:
self.opencti.app_logger.info(
"Adding label to Stix-Cyber-Observable",
{"label_id": label_id, "id": id},
)
query = """
mutation StixCyberObservableAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCyberObservableEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": label_id,
"relationship_type": "object-label",
},
},
)
return True
else:
self.opencti.app_logger.error("Missing parameters: id and label_id")
return False
[docs]
def remove_label(self, **kwargs):
"""Remove a Label object from Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param label_id: the id of the Label
:type label_id: str
:param label_name: the name of the Label (alternative to label_id)
:type label_name: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
label_id = kwargs.get("label_id", None)
label_name = kwargs.get("label_name", None)
if label_name is not None:
label = self.opencti.label.read(
filters={
"mode": "and",
"filters": [{"key": "value", "values": [label_name]}],
"filterGroups": [],
}
)
if label:
label_id = label["id"]
if id is not None and label_id is not None:
self.opencti.app_logger.info(
"Removing label from Stix-Cyber-Observable",
{"label_id": label_id, "id": id},
)
query = """
mutation StixCyberObservableRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCyberObservableEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": label_id,
"relationship_type": "object-label",
},
)
return True
else:
self.opencti.app_logger.error("Missing parameters: id and label_id")
return False
[docs]
def add_external_reference(self, **kwargs):
"""Add an External-Reference object to Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param external_reference_id: the id of the External-Reference
:type external_reference_id: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
external_reference_id = kwargs.get("external_reference_id", None)
if id is not None and external_reference_id is not None:
custom_attributes = """
id
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
}
}
}
"""
stix_domain_object = self.read(id=id, customAttributes=custom_attributes)
if stix_domain_object is None:
self.opencti.app_logger.error(
"Cannot add External-Reference, entity not found"
)
return False
if external_reference_id in stix_domain_object["externalReferencesIds"]:
return True
else:
self.opencti.app_logger.info(
"Adding External-Reference to Stix-Cyber-Observable",
{"external_reference_id": external_reference_id, "id": id},
)
query = """
mutation StixCyberObservabletEditRelationAdd($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCyberObservableEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": external_reference_id,
"relationship_type": "external-reference",
},
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and external_reference_id"
)
return False
[docs]
def remove_external_reference(self, **kwargs):
"""Remove an External-Reference object from Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param external_reference_id: the id of the External-Reference
:type external_reference_id: str
:return: True on success, False on failure
:rtype: bool
"""
id = kwargs.get("id", None)
external_reference_id = kwargs.get("external_reference_id", None)
if id is not None and external_reference_id is not None:
self.opencti.app_logger.info(
"Removing External-Reference from Stix-Cyber-Observable",
{"external_reference_id": external_reference_id, "id": id},
)
query = """
mutation StixCyberObservableRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCyberObservableEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": external_reference_id,
"relationship_type": "external-reference",
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and external_reference_id"
)
return False
[docs]
def push_list_export(
self,
entity_id,
entity_type,
file_name,
file_markings,
data,
list_filters="",
mime_type=None,
):
"""Push a list export for Stix-Cyber-Observables.
:param entity_id: the entity ID
:type entity_id: str
:param entity_type: the entity type
:type entity_type: str
:param file_name: the file name
:type file_name: str
:param file_markings: list of marking definition IDs for the file
:type file_markings: list
:param data: the file data
:type data: bytes
:param list_filters: the list filters (default: "")
:type list_filters: str
:param mime_type: the MIME type (optional)
:type mime_type: str
:return: None
:rtype: None
"""
query = """
mutation StixCyberObservablesExportPush(
$entity_id: String,
$entity_type: String!,
$file: Upload!,
$file_markings: [String]!,
$listFilters: String
) {
stixCyberObservablesExportPush(
entity_id: $entity_id,
entity_type: $entity_type,
file: $file,
file_markings: $file_markings,
listFilters: $listFilters
)
}
"""
if mime_type is None:
file = self.opencti.file(file_name, data)
else:
file = self.opencti.file(file_name, data, mime_type)
self.opencti.query(
query,
{
"entity_id": entity_id,
"entity_type": entity_type,
"file": file,
"file_markings": file_markings,
"listFilters": list_filters,
},
)
[docs]
def ask_for_enrichment(self, **kwargs) -> str:
"""Ask for enrichment of a Stix-Cyber-Observable.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:param connector_id: the id of the enrichment connector
:type connector_id: str
:return: The work ID
:rtype: str
"""
id = kwargs.get("id", None)
connector_id = kwargs.get("connector_id", None)
if id is None or connector_id is None:
self.opencti.app_logger.error("Missing parameters: id and connector_id")
return ""
query = """
mutation StixCoreObjectEnrichmentLinesMutation($id: ID!, $connectorId: ID!) {
stixCoreObjectEdit(id: $id) {
askEnrichment(connectorId: $connectorId) {
id
}
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"connectorId": connector_id,
},
)
# return work_id
return result["data"]["stixCoreObjectEdit"]["askEnrichment"]["id"]
[docs]
def reports(self, **kwargs):
"""Get the reports about a Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:return: List of reports
:rtype: list or None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info(
"Getting reports of the Stix-Cyber-Observable", {"id": id}
)
query = """
query StixCyberObservable($id: String!) {
stixCyberObservable(id: $id) {
reports {
edges {
node {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
revoked
confidence
created
modified
name
description
report_types
published
}
}
}
}
}
"""
result = self.opencti.query(query, {"id": id})
processed_result = self.opencti.process_multiple_fields(
result["data"]["stixCyberObservable"]
)
if processed_result:
return processed_result["reports"]
else:
return []
else:
self.opencti.app_logger.error("Missing parameters: id")
return None
[docs]
def notes(self, **kwargs):
"""Get the notes about a Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:return: List of notes
:rtype: list or None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info(
"Getting notes of the Stix-Cyber-Observable", {"id": id}
)
query = """
query StixCyberObservable($id: String!) {
stixCyberObservable(id: $id) {
notes {
edges {
node {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
revoked
confidence
created
modified
attribute_abstract
content
authors
note_types
likelihood
}
}
}
}
}
"""
result = self.opencti.query(query, {"id": id})
processed_result = self.opencti.process_multiple_fields(
result["data"]["stixCyberObservable"]
)
if processed_result:
return processed_result["notes"]
else:
return []
else:
self.opencti.app_logger.error("Missing parameters: id")
return None
[docs]
def observed_data(self, **kwargs):
"""Get the observed data of a Stix-Cyber-Observable object.
:param id: the id of the Stix-Cyber-Observable
:type id: str
:return: List of observed data
:rtype: list or None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info(
"Getting Observed-Data of the Stix-Cyber-Observable", {"id": id}
)
query = """
query StixCyberObservable($id: String!) {
stixCyberObservable(id: $id) {
observedData {
edges {
node {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
revoked
confidence
created
modified
first_observed
last_observed
number_observed
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
}
}
"""
result = self.opencti.query(query, {"id": id})
processed_result = self.opencti.process_multiple_fields(
result["data"]["stixCyberObservable"]
)
if processed_result:
return processed_result["observedData"]
else:
return []
else:
self.opencti.app_logger.error("Missing parameters: id")
return None