# coding: utf-8
import json
[docs]
class StixCoreObject:
"""Main StixCoreObject class for OpenCTI
Base class for managing STIX core objects in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
def __init__(self, opencti):
"""Initialize the StixCoreObject instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
objectOrganization {
id
standard_id
name
}
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
}
}
}
... on StixDomainObject {
revoked
confidence
created
modified
}
... on AttackPattern {
name
description
aliases
x_mitre_platforms
x_mitre_permissions_required
x_mitre_detection
x_mitre_id
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on Campaign {
name
description
aliases
first_seen
last_seen
objective
}
... on Note {
attribute_abstract
content
authors
note_types
likelihood
}
... on ObservedData {
first_observed
last_observed
number_observed
}
... on Opinion {
explanation
authors
opinion
}
... on Report {
name
description
report_types
published
}
... on Grouping {
name
description
context
objects {
edges {
node {
... on BasicObject {
id
entity_type
standard_id
}
... on BasicRelationship {
id
entity_type
standard_id
}
}
}
}
}
... on CourseOfAction {
name
description
x_opencti_aliases
}
... on DataComponent {
name
description
dataSource {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
revoked
confidence
created
modified
name
description
x_mitre_platforms
collection_layers
}
}
... on DataSource {
name
description
x_mitre_platforms
collection_layers
}
... on Individual {
name
description
contact_information
x_opencti_aliases
x_opencti_firstname
x_opencti_lastname
}
... on Organization {
name
description
contact_information
x_opencti_aliases
x_opencti_organization_type
x_opencti_reliability
}
... on Sector {
name
description
contact_information
x_opencti_aliases
}
... on System {
name
description
contact_information
x_opencti_aliases
}
... on Indicator {
pattern_type
pattern_version
pattern
name
description
indicator_types
valid_from
valid_until
x_opencti_score
x_opencti_detection
x_opencti_main_observable_type
}
... on Infrastructure {
name
description
aliases
infrastructure_types
first_seen
last_seen
}
... on IntrusionSet {
name
description
aliases
first_seen
last_seen
goals
resource_level
primary_motivation
secondary_motivations
}
... on City {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Country {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Region {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Position {
name
description
latitude
longitude
precision
x_opencti_aliases
street_address
postal_code
}
... on Malware {
name
description
aliases
malware_types
is_family
first_seen
last_seen
architecture_execution_envs
implementation_languages
capabilities
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on MalwareAnalysis {
product
version
configuration_version
modules
analysis_engine_version
analysis_definition_version
submitted
analysis_started
analysis_ended
result_name
result
}
... on ThreatActor {
name
description
aliases
threat_actor_types
first_seen
last_seen
roles
goals
sophistication
resource_level
primary_motivation
secondary_motivations
personal_motivations
}
... on Tool {
name
description
aliases
tool_types
tool_version
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on Vulnerability {
name
description
x_opencti_aliases
x_opencti_cvss_vector_string
x_opencti_cvss_base_score
x_opencti_cvss_base_severity
x_opencti_cvss_attack_vector
x_opencti_cvss_attack_complexity
x_opencti_cvss_privileges_required
x_opencti_cvss_user_interaction
x_opencti_cvss_scope
x_opencti_cvss_confidentiality_impact
x_opencti_cvss_integrity_impact
x_opencti_cvss_availability_impact
x_opencti_cvss_exploit_code_maturity
x_opencti_cvss_remediation_level
x_opencti_cvss_report_confidence
x_opencti_cvss_temporal_score
x_opencti_cvss_v2_vector_string
x_opencti_cvss_v2_base_score
x_opencti_cvss_v2_access_vector
x_opencti_cvss_v2_access_complexity
x_opencti_cvss_v2_authentication
x_opencti_cvss_v2_confidentiality_impact
x_opencti_cvss_v2_integrity_impact
x_opencti_cvss_v2_availability_impact
x_opencti_cvss_v2_exploitability
x_opencti_cvss_v2_remediation_level
x_opencti_cvss_v2_report_confidence
x_opencti_cvss_v2_temporal_score
x_opencti_cvss_v4_vector_string
x_opencti_cvss_v4_base_score
x_opencti_cvss_v4_base_severity
x_opencti_cvss_v4_attack_vector
x_opencti_cvss_v4_attack_complexity
x_opencti_cvss_v4_attack_requirements
x_opencti_cvss_v4_privileges_required
x_opencti_cvss_v4_user_interaction
x_opencti_cvss_v4_confidentiality_impact_v
x_opencti_cvss_v4_confidentiality_impact_s
x_opencti_cvss_v4_integrity_impact_v
x_opencti_cvss_v4_integrity_impact_s
x_opencti_cvss_v4_availability_impact_v
x_opencti_cvss_v4_availability_impact_s
x_opencti_cvss_v4_exploit_maturity
x_opencti_cwe
x_opencti_cisa_kev
x_opencti_epss_score
x_opencti_epss_percentile
x_opencti_score
}
... on Incident {
name
description
aliases
first_seen
last_seen
objective
}
... on Event {
name
description
}
... on Channel {
name
description
aliases
channel_types
}
... on Narrative {
name
description
aliases
narrative_types
}
... on Case {
name
description
objects {
edges {
node {
... on BasicObject {
id
entity_type
standard_id
}
... on BasicRelationship {
id
entity_type
standard_id
}
}
}
}
}
... on Feedback {
name
description
objects {
edges {
node {
... on BasicObject {
id
entity_type
standard_id
}
... on BasicRelationship {
id
entity_type
standard_id
}
}
}
}
}
... on StixCyberObservable {
observable_value
indicators {
edges {
node {
id
pattern
pattern_type
}
}
}
}
... on AutonomousSystem {
number
name_alt: name
rir
}
... on Directory {
path
path_enc
ctime
mtime
atime
}
... on DomainName {
value
}
... on EmailAddr {
value
display_name
}
... on EmailMessage {
is_multipart
attribute_date
content_type
message_id
subject
received_lines
body
}
... on Artifact {
mime_type
payload_bin
url
encryption_algorithm
decryption_key
hashes {
algorithm
hash
}
importFiles {
edges {
node {
id
name
size
}
}
}
}
... on StixFile {
extensions
size
name_alt: name
name_enc
magic_number_hex
mime_type
ctime
mtime
atime
x_opencti_additional_names
hashes {
algorithm
hash
}
}
... on X509Certificate {
is_self_signed
version
serial_number
signature_algorithm
issuer
subject
subject_public_key_algorithm
subject_public_key_modulus
subject_public_key_exponent
validity_not_before
validity_not_after
hashes {
algorithm
hash
}
basic_constraints
name_constraints
policy_constraints
key_usage
extended_key_usage
subject_key_identifier
authority_key_identifier
subject_alternative_name
issuer_alternative_name
subject_directory_attributes
crl_distribution_points
inhibit_any_policy
private_key_usage_period_not_before
private_key_usage_period_not_after
certificate_policies
policy_mappings
}
... on IPv4Addr {
value
}
... on IPv6Addr {
value
}
... on MacAddr {
value
}
... on Mutex {
name_alt: name
}
... on NetworkTraffic {
extensions
start
end
is_active
src_port
dst_port
protocols
src_byte_count
dst_byte_count
src_packets
dst_packets
}
... on Process {
extensions
is_hidden
pid
created_time
cwd
command_line
environment_variables
}
... on Software {
name_alt: name
cpe
swid
languages
vendor
version
}
... on Url {
value
}
... on UserAccount {
extensions
user_id
credential
account_login
account_type
display_name
is_service_account
is_privileged
can_escalate_privs
is_disabled
account_created
account_expires
credential_last_changed
account_first_login
account_last_login
}
... on WindowsRegistryKey {
attribute_key
modified_time
number_of_subkeys
}
... on WindowsRegistryValueType {
name_alt: name
data
data_type
}
... on CryptographicKey {
value
}
... on CryptocurrencyWallet {
value
}
... on Hostname {
value
}
... on Text {
value
}
... on UserAgent {
value
}
... on BankAccount {
iban
bic
account_number
}
... on PhoneNumber {
value
}
... on TrackingNumber {
value
}
... on Credential {
value
}
... on PaymentCard {
card_number
expiration_date
cvv
holder_name
}
... on Persona {
persona_name
persona_type
}
... on MediaContent {
title
content_alt: content
media_category
url
publication_date
}
"""
[docs]
self.properties_with_files = """
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
objectOrganization {
id
standard_id
name
}
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
... on StixDomainObject {
revoked
confidence
created
modified
}
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
}
}
}
... on AttackPattern {
name
description
aliases
x_mitre_platforms
x_mitre_permissions_required
x_mitre_detection
x_mitre_id
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on Campaign {
name
description
aliases
first_seen
last_seen
objective
}
... on Note {
attribute_abstract
content
authors
note_types
likelihood
}
... on ObservedData {
first_observed
last_observed
number_observed
}
... on Opinion {
explanation
authors
opinion
}
... on Report {
name
description
report_types
published
}
... on Grouping {
name
description
context
objects {
edges {
node {
... on BasicObject {
id
entity_type
standard_id
}
... on BasicRelationship {
id
entity_type
standard_id
}
}
}
}
}
... on CourseOfAction {
name
description
x_opencti_aliases
}
... on DataComponent {
name
description
dataSource {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
revoked
confidence
created
modified
name
description
x_mitre_platforms
collection_layers
}
}
... on DataSource {
name
description
x_mitre_platforms
collection_layers
}
... on Individual {
name
description
contact_information
x_opencti_aliases
x_opencti_firstname
x_opencti_lastname
}
... on Organization {
name
description
contact_information
x_opencti_aliases
x_opencti_organization_type
x_opencti_reliability
}
... on Sector {
name
description
contact_information
x_opencti_aliases
}
... on System {
name
description
contact_information
x_opencti_aliases
}
... on Indicator {
pattern_type
pattern_version
pattern
name
description
indicator_types
valid_from
valid_until
x_opencti_score
x_opencti_detection
x_opencti_main_observable_type
}
... on Infrastructure {
name
description
aliases
infrastructure_types
first_seen
last_seen
}
... on IntrusionSet {
name
description
aliases
first_seen
last_seen
goals
resource_level
primary_motivation
secondary_motivations
}
... on City {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Country {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Region {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Position {
name
description
latitude
longitude
precision
x_opencti_aliases
street_address
postal_code
}
... on Malware {
name
description
aliases
malware_types
is_family
first_seen
last_seen
architecture_execution_envs
implementation_languages
capabilities
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on MalwareAnalysis {
product
version
configuration_version
modules
analysis_engine_version
analysis_definition_version
submitted
analysis_started
analysis_ended
result_name
result
}
... on ThreatActor {
name
description
aliases
threat_actor_types
first_seen
last_seen
roles
goals
sophistication
resource_level
primary_motivation
secondary_motivations
personal_motivations
}
... on Tool {
name
description
aliases
tool_types
tool_version
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on Vulnerability {
name
description
x_opencti_aliases
x_opencti_cvss_vector_string
x_opencti_cvss_base_score
x_opencti_cvss_base_severity
x_opencti_cvss_attack_vector
x_opencti_cvss_attack_complexity
x_opencti_cvss_privileges_required
x_opencti_cvss_user_interaction
x_opencti_cvss_scope
x_opencti_cvss_confidentiality_impact
x_opencti_cvss_integrity_impact
x_opencti_cvss_availability_impact
x_opencti_cvss_exploit_code_maturity
x_opencti_cvss_remediation_level
x_opencti_cvss_report_confidence
x_opencti_cvss_temporal_score
x_opencti_cvss_v2_vector_string
x_opencti_cvss_v2_base_score
x_opencti_cvss_v2_access_vector
x_opencti_cvss_v2_access_complexity
x_opencti_cvss_v2_authentication
x_opencti_cvss_v2_confidentiality_impact
x_opencti_cvss_v2_integrity_impact
x_opencti_cvss_v2_availability_impact
x_opencti_cvss_v2_exploitability
x_opencti_cvss_v2_remediation_level
x_opencti_cvss_v2_report_confidence
x_opencti_cvss_v2_temporal_score
x_opencti_cvss_v4_vector_string
x_opencti_cvss_v4_base_score
x_opencti_cvss_v4_base_severity
x_opencti_cvss_v4_attack_vector
x_opencti_cvss_v4_attack_complexity
x_opencti_cvss_v4_attack_requirements
x_opencti_cvss_v4_privileges_required
x_opencti_cvss_v4_user_interaction
x_opencti_cvss_v4_confidentiality_impact_v
x_opencti_cvss_v4_confidentiality_impact_s
x_opencti_cvss_v4_integrity_impact_v
x_opencti_cvss_v4_integrity_impact_s
x_opencti_cvss_v4_availability_impact_v
x_opencti_cvss_v4_availability_impact_s
x_opencti_cvss_v4_exploit_maturity
x_opencti_cwe
x_opencti_cisa_kev
x_opencti_epss_score
x_opencti_epss_percentile
x_opencti_score
}
... on Incident {
name
description
aliases
first_seen
last_seen
objective
}
... on Event {
name
description
}
... on Channel {
name
description
aliases
channel_types
}
... on Narrative {
name
description
aliases
narrative_types
}
... on Case {
name
description
objects {
edges {
node {
... on BasicObject {
id
entity_type
standard_id
}
... on BasicRelationship {
id
entity_type
standard_id
}
}
}
}
}
... on Feedback {
name
description
objects {
edges {
node {
... on BasicObject {
id
entity_type
standard_id
}
... on BasicRelationship {
id
entity_type
standard_id
}
}
}
}
}
... on StixCyberObservable {
observable_value
indicators {
edges {
node {
id
pattern
pattern_type
}
}
}
}
... on AutonomousSystem {
number
name_alt: name
rir
}
... on Directory {
path
path_enc
ctime
mtime
atime
}
... on DomainName {
value
}
... on EmailAddr {
value
display_name
}
... on EmailMessage {
is_multipart
attribute_date
content_type
message_id
subject
received_lines
body
}
... on Artifact {
mime_type
payload_bin
url
encryption_algorithm
decryption_key
hashes {
algorithm
hash
}
importFiles {
edges {
node {
id
name
size
}
}
}
}
... on StixFile {
extensions
size
name_alt: name
name_enc
magic_number_hex
mime_type
ctime
mtime
atime
x_opencti_additional_names
hashes {
algorithm
hash
}
}
... on X509Certificate {
is_self_signed
version
serial_number
signature_algorithm
issuer
subject
subject_public_key_algorithm
subject_public_key_modulus
subject_public_key_exponent
validity_not_before
validity_not_after
hashes {
algorithm
hash
}
basic_constraints
name_constraints
policy_constraints
key_usage
extended_key_usage
subject_key_identifier
authority_key_identifier
subject_alternative_name
issuer_alternative_name
subject_directory_attributes
crl_distribution_points
inhibit_any_policy
private_key_usage_period_not_before
private_key_usage_period_not_after
certificate_policies
policy_mappings
}
... on IPv4Addr {
value
}
... on IPv6Addr {
value
}
... on MacAddr {
value
}
... on Mutex {
name_alt: name
}
... on NetworkTraffic {
extensions
start
end
is_active
src_port
dst_port
protocols
src_byte_count
dst_byte_count
src_packets
dst_packets
}
... on Process {
extensions
is_hidden
pid
created_time
cwd
command_line
environment_variables
}
... on Software {
name_alt: name
cpe
swid
languages
vendor
version
}
... on Url {
value
}
... on UserAccount {
extensions
user_id
credential
account_login
account_type
display_name
is_service_account
is_privileged
can_escalate_privs
is_disabled
account_created
account_expires
credential_last_changed
account_first_login
account_last_login
}
... on WindowsRegistryKey {
attribute_key
modified_time
number_of_subkeys
}
... on WindowsRegistryValueType {
name_alt: name
data
data_type
}
... on CryptographicKey {
value
}
... on CryptocurrencyWallet {
value
}
... on Hostname {
value
}
... on Text {
value
}
... on UserAgent {
value
}
... on BankAccount {
iban
bic
account_number
}
... on PhoneNumber {
value
}
... on TrackingNumber {
value
}
... on Credential {
value
}
... on PaymentCard {
card_number
expiration_date
cvv
holder_name
}
... on Persona {
persona_name
persona_type
}
... on MediaContent {
title
content_alt: content
media_category
url
publication_date
}
"""
[docs]
def list(self, **kwargs):
"""List Stix-Core-Object objects.
:param types: the list of types
:type types: list
:param filters: the filters to apply
:type filters: dict
:param search: the search keyword
:type search: str
:param first: return the first n rows from the after ID (or the beginning if not set)
:type first: int
:param after: ID of the first row for pagination
:type after: str
:param orderBy: field to order results by
:type orderBy: str
:param orderMode: ordering mode (asc/desc)
:type orderMode: str
:param customAttributes: custom attributes to return
:type customAttributes: str
:param getAll: whether to retrieve all results
:type getAll: bool
:param withPagination: whether to include pagination info
:type withPagination: bool
:param withFiles: whether to include files
:type withFiles: bool
:return: List of Stix-Core-Object objects
:rtype: list
"""
types = kwargs.get("types", None)
filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 100)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
with_files = kwargs.get("withFiles", False)
self.opencti.app_logger.info(
"Listing Stix-Core-Objects with filters", {"filters": json.dumps(filters)}
)
query = (
"""
query StixCoreObjects($types: [String], $filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: StixCoreObjectsOrdering, $orderMode: OrderingMode) {
stixCoreObjects(types: $types, filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
edges {
node {
"""
+ (
custom_attributes
if custom_attributes is not None
else (self.properties_with_files if with_files else self.properties)
)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"types": types,
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
if get_all:
final_data = []
data = self.opencti.process_multiple(result["data"]["stixCoreObjects"])
final_data = final_data + data
while result["data"]["stixCoreObjects"]["pageInfo"]["hasNextPage"]:
after = result["data"]["stixCoreObjects"]["pageInfo"]["endCursor"]
self.opencti.app_logger.debug(
"Listing Stix-Core-Objects", {"after": after}
)
result = self.opencti.query(
query,
{
"types": types,
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
data = self.opencti.process_multiple(result["data"]["stixCoreObjects"])
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["stixCoreObjects"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read a Stix-Core-Object object.
:param id: the id of the Stix-Core-Object
:type id: str
:param types: list of Stix Core Entity types
:type types: list
:param filters: the filters to apply if no id provided
:type filters: dict
:param customAttributes: custom attributes to return
:type customAttributes: str
:param withFiles: whether to include files
:type withFiles: bool
:return: Stix-Core-Object object
:rtype: dict or None
"""
id = kwargs.get("id", None)
types = kwargs.get("types", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
with_files = kwargs.get("withFiles", False)
if id is not None:
self.opencti.app_logger.info("Reading Stix-Core-Object", {"id": id})
query = (
"""
query StixCoreObject($id: String!) {
stixCoreObject(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else (self.properties_with_files if with_files else self.properties)
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(
result["data"]["stixCoreObject"]
)
elif filters is not None:
result = self.list(
types=types, filters=filters, customAttributes=custom_attributes
)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_stix_core_object] Missing parameters: id or filters"
)
return None
[docs]
def list_files(self, **kwargs):
"""List files of a Stix-Core-Object.
:param id: the id of the Stix-Core-Object
:type id: str
:return: List of files associated with the object
:rtype: list
"""
id = kwargs.get("id", None)
self.opencti.app_logger.debug("Listing files of Stix-Core-Object", {"id": id})
query = """
query StixCoreObject($id: String!) {
stixCoreObject(id: $id) {
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
"""
result = self.opencti.query(query, {"id": id})
entity = self.opencti.process_multiple_fields(result["data"]["stixCoreObject"])
return entity["importFiles"]
[docs]
def push_list_export(
self,
entity_id,
entity_type,
file_name,
file_markings,
data,
list_filters="",
mime_type=None,
):
"""Push a list export file for Stix-Core-Objects.
:param entity_id: the id of the entity (optional)
:type entity_id: str or None
:param entity_type: the type of the entity
:type entity_type: str
:param file_name: the name of the file to export
:type file_name: str
:param file_markings: list of marking definition ids to apply
:type file_markings: list
:param data: the data content to export
:type data: str
:param list_filters: filters to apply on the list (default: "")
:type list_filters: str
:param mime_type: the MIME type of the file (optional)
:type mime_type: str or None
"""
query = """
mutation StixCoreObjectsExportPush($entity_id: String, $entity_type: String!, $file: Upload!, $file_markings: [String]!, $listFilters: String) {
stixCoreObjectsExportPush(entity_id: $entity_id, entity_type: $entity_type, file: $file, file_markings: $file_markings, listFilters: $listFilters)
}
"""
if mime_type is None:
file = self.opencti.file(file_name, data)
else:
file = self.opencti.file(file_name, data, mime_type)
self.opencti.query(
query,
{
"entity_id": entity_id,
"entity_type": entity_type,
"file": file,
"file_markings": file_markings,
"listFilters": list_filters,
},
)
[docs]
def push_analysis(
self,
entity_id,
file_name,
data,
content_source,
content_type,
analysis_type,
):
"""Push an analysis file for a Stix-Core-Object.
:param entity_id: the id of the Stix-Core-Object
:type entity_id: str
:param file_name: the name of the analysis file
:type file_name: str
:param data: the analysis data content
:type data: str
:param content_source: the source of the content
:type content_source: str
:param content_type: the type of analysis content
:type content_type: str
:param analysis_type: the type of analysis
:type analysis_type: str
"""
query = """
mutation StixCoreObjectEdit(
$id: ID!, $file: Upload!, $contentSource: String!, $contentType: AnalysisContentType!, $analysisType: String!
) {
stixCoreObjectEdit(id: $id) {
analysisPush(file: $file,contentSource: $contentSource,contentType: $contentType,analysisType: $analysisType){
id
name
}
}
}
"""
file = self.opencti.file(file_name, data)
self.opencti.query(
query,
{
"id": entity_id,
"file": file,
"contentSource": content_source,
"contentType": content_type,
"analysisType": analysis_type,
},
)
[docs]
def reports(self, **kwargs):
"""Get the reports about a Stix-Core-Object object.
:param id: the id of the Stix-Core-Object
:type id: str
:return: List of reports
:rtype: list or None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info(
"Getting reports of the Stix-Core-Object", {"id": id}
)
query = """
query StixCoreObject($id: String!) {
stixCoreObject(id: $id) {
reports {
edges {
node {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
revoked
confidence
created
modified
name
description
report_types
published
}
}
}
}
}
"""
result = self.opencti.query(query, {"id": id})
processed_result = self.opencti.process_multiple_fields(
result["data"]["stixCoreObject"]
)
if processed_result:
return processed_result["reports"]
else:
return []
else:
self.opencti.app_logger.error("Missing parameters: id")
return None
[docs]
def rule_apply(self, **kwargs):
"""Apply rule to Stix-Core-Object object.
:param element_id: the Stix-Core-Object id
:type element_id: str
:param rule_id: the rule to apply
:type rule_id: str
"""
rule_id = kwargs.get("rule_id", None)
element_id = kwargs.get("element_id", None)
if element_id is not None and rule_id is not None:
self.opencti.app_logger.info(
"Apply rule stix_core_object", {"id": element_id}
)
query = """
mutation StixCoreApplyRule($elementId: ID!, $ruleId: ID!) {
ruleApply(elementId: $elementId, ruleId: $ruleId)
}
"""
self.opencti.query(query, {"elementId": element_id, "ruleId": rule_id})
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot apply rule, missing parameters: id"
)
return None
[docs]
def rule_apply_async(self, **kwargs):
"""Apply rule to Stix-Core-Object object.
:param element_id: the Stix-Core-Object id
:type element_id: str
:param rule_id: the rule to apply
:type rule_id: str
"""
rule_id = kwargs.get("rule_id", None)
element_id = kwargs.get("element_id", None)
execution_id = kwargs.get("execution_id", None)
rule_apply_complete = False
if element_id is not None and rule_id is not None and execution_id is not None:
while not rule_apply_complete:
self.opencti.app_logger.info(
"Apply rule async stix_core_object",
{"id": element_id, "execution_id": execution_id},
)
query = """
mutation StixCoreApplyRule($elementId: ID!, $ruleId: ID!, $executionId: ID!) {
ruleApplyAsync(elementId: $elementId, ruleId: $ruleId, executionId: $executionId)
}
"""
result = self.opencti.query(
query,
{
"elementId": element_id,
"ruleId": rule_id,
"executionId": execution_id,
},
)
rule_apply_complete = result["data"]["ruleApplyAsync"]
self.opencti.app_logger.info(
"Apply rule async stix_core_object complete", {"id": element_id}
)
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot apply rule, missing parameters: id"
)
return None
[docs]
def rule_clear(self, **kwargs):
"""Apply rule clear to Stix-Core-Object object.
:param element_id: the Stix-Core-Object id
:type element_id: str
:param rule_id: the rule to clear
:type rule_id: str
"""
rule_id = kwargs.get("rule_id", None)
element_id = kwargs.get("element_id", None)
if element_id is not None and rule_id is not None:
self.opencti.app_logger.info(
"Apply rule clear stix_core_object", {"id": element_id}
)
query = """
mutation StixCoreClearRule($elementId: ID!, $ruleId: ID!) {
ruleClear(elementId: $elementId, ruleId: $ruleId)
}
"""
self.opencti.query(query, {"elementId": element_id, "ruleId": rule_id})
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot clear rule, missing parameters: id"
)
return None
[docs]
def rules_rescan(self, **kwargs):
"""Apply rules rescan to Stix-Core-Object object.
:param element_id: the Stix-Core-Object id
:type element_id: str
"""
element_id = kwargs.get("element_id", None)
if element_id is not None:
self.opencti.app_logger.info(
"Apply rules rescan stix_core_object", {"id": element_id}
)
query = """
mutation StixCoreRescanRules($elementId: ID!) {
rulesRescan(elementId: $elementId)
}
"""
self.opencti.query(query, {"elementId": element_id})
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot rescan rule, missing parameters: id"
)
return None
[docs]
def rule_rescan_async(self, **kwargs):
"""Apply rules rescan to Stix-Core-Object object.
:param element_id: the Stix-Core-Object id
:type element_id: str
"""
element_id = kwargs.get("element_id", None)
execution_id = kwargs.get("execution_id", None)
rule_rescan_complete = False
if element_id is not None and execution_id is not None:
while not rule_rescan_complete:
self.opencti.app_logger.info(
"Apply rules rescan stix_core_object", {"id": element_id}
)
query = """
mutation StixCoreRescanRules($elementId: ID!) {
rulesRescanAsync(elementId: $elementId, $executionId: ID!)
}
"""
result = self.opencti.query(
query, {"elementId": element_id, "executionId": execution_id}
)
rule_rescan_complete = result["data"]["rulesRescanAsync"]
self.opencti.app_logger.info(
"Rescan rule async stix_core_object complete", {"id": element_id}
)
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot rescan rule, missing parameters: id"
)
return None
[docs]
def clear_access_restriction(self, **kwargs):
"""Ask clear restriction on a Stix-Core-Object.
:param element_id: the Stix-Core-Object id
:type element_id: str
"""
element_id = kwargs.get("element_id", None)
if element_id is not None:
query = """
mutation StixCoreObjectEdit($id: ID!) {
stixCoreObjectEdit(id: $id) {
clearAccessRestriction {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": element_id,
},
)
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot clear access restriction, missing parameters: id"
)
return None
[docs]
def ask_enrichment(self, **kwargs):
"""Ask enrichment with a single connector.
:param element_id: the Stix-Core-Object id
:type element_id: str
:param connector_id: the connector id
:type connector_id: str
"""
element_id = kwargs.get("element_id", None)
connector_id = kwargs.get("connector_id", None)
query = """
mutation StixCoreObjectEdit($id: ID!, $connectorId: ID!) {
stixCoreObjectEdit(id: $id) {
askEnrichment(connectorId: $connectorId) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": element_id,
"connectorId": connector_id,
},
)
[docs]
def ask_enrichments(self, **kwargs):
"""Ask enrichment with multiple connectors.
:param element_id: the Stix-Core-Object id
:type element_id: str
:param connector_ids: list of connector ids
:type connector_ids: list
"""
element_id = kwargs.get("element_id", None)
connector_ids = kwargs.get("connector_ids", None)
query = """
mutation StixCoreObjectEdit($id: ID!, $connectorIds: [ID!]!) {
stixCoreObjectEdit(id: $id) {
askEnrichments(connectorIds: $connectorIds) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": element_id,
"connectorIds": connector_ids,
},
)
[docs]
def organization_share(self, entity_id, organization_ids, sharing_direct_container):
"""Share element to multiple organizations.
:param entity_id: the Stix-Core-Object id
:type entity_id: str
:param organization_ids: list of organization ids to share with
:type organization_ids: list
:param sharing_direct_container: whether to share direct containers
:type sharing_direct_container: bool
"""
query = """
mutation StixCoreObjectEdit($id: ID!, $organizationId: [ID!]!, $directContainerSharing: Boolean) {
stixCoreObjectEdit(id: $id) {
restrictionOrganizationAdd(organizationId: $organizationId, directContainerSharing: $directContainerSharing) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": entity_id,
"organizationId": organization_ids,
"directContainerSharing": sharing_direct_container,
},
)
[docs]
def organization_unshare(
self, entity_id, organization_ids, sharing_direct_container
):
"""Unshare element from multiple organizations.
:param entity_id: the Stix-Core-Object id
:type entity_id: str
:param organization_ids: list of organization ids to unshare from
:type organization_ids: list
:param sharing_direct_container: whether to unshare direct containers
:type sharing_direct_container: bool
"""
query = """
mutation StixCoreObjectEdit($id: ID!, $organizationId: [ID!]!, $directContainerSharing: Boolean) {
stixCoreObjectEdit(id: $id) {
restrictionOrganizationDelete(organizationId: $organizationId, directContainerSharing: $directContainerSharing) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": entity_id,
"organizationId": organization_ids,
"directContainerSharing": sharing_direct_container,
},
)
[docs]
def delete(self, **kwargs):
"""Delete a Stix-Core-Object object.
:param id: the Stix-Core-Object id
:type id: str
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info("Deleting stix_core_object", {"id": id})
query = """
mutation StixCoreObjectEdit($id: ID!) {
stixCoreObjectEdit(id: $id) {
delete
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error(
"[opencti_stix_core_object] Missing parameters: id"
)
return None
[docs]
def remove_from_draft(self, **kwargs):
"""Remove a Stix-Core-Object object from draft (revert).
:param id: the Stix-Core-Object id
:type id: str
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info("Draft remove stix_core_object", {"id": id})
query = """
mutation StixCoreObjectEditDraftRemove($id: ID!) {
stixCoreObjectEdit(id: $id) {
removeFromDraft
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error(
"[stix_core_object] Cannot remove from draft, missing parameters: id"
)
return None