Source code for pycti.entities.opencti_stix_object_or_stix_relationship
import json
[docs]
class StixObjectOrStixRelationship:
"""Main StixObjectOrStixRelationship class for OpenCTI
Manages generic STIX objects and relationships in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the StixObjectOrStixRelationship instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
... on StixObject {
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
}
... on StixDomainObject {
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectOrganization {
id
standard_id
name
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
}
}
}
revoked
confidence
created
modified
}
... on AttackPattern {
name
description
aliases
x_mitre_platforms
x_mitre_permissions_required
x_mitre_detection
x_mitre_id
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on Campaign {
name
description
aliases
first_seen
last_seen
objective
}
... on Note {
attribute_abstract
content
authors
note_types
likelihood
}
... on ObservedData {
first_observed
last_observed
number_observed
}
... on Opinion {
explanation
authors
opinion
}
... on Report {
name
description
report_types
published
}
... on CourseOfAction {
name
description
x_opencti_aliases
}
... on Individual {
name
description
contact_information
x_opencti_aliases
x_opencti_firstname
x_opencti_lastname
}
... on Organization {
name
description
contact_information
x_opencti_aliases
x_opencti_organization_type
x_opencti_reliability
}
... on Sector {
name
description
contact_information
x_opencti_aliases
}
... on System {
name
description
contact_information
x_opencti_aliases
}
... on Indicator {
pattern_type
pattern_version
pattern
name
description
indicator_types
valid_from
valid_until
x_opencti_score
x_opencti_detection
x_opencti_main_observable_type
}
... on Infrastructure {
name
description
aliases
infrastructure_types
first_seen
last_seen
}
... on IntrusionSet {
name
description
aliases
first_seen
last_seen
goals
resource_level
primary_motivation
secondary_motivations
}
... on City {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Country {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Region {
name
description
latitude
longitude
precision
x_opencti_aliases
}
... on Position {
name
description
latitude
longitude
precision
x_opencti_aliases
street_address
postal_code
}
... on Malware {
name
description
aliases
malware_types
is_family
first_seen
last_seen
architecture_execution_envs
implementation_languages
capabilities
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on ThreatActor {
name
description
aliases
threat_actor_types
first_seen
last_seen
roles
goals
sophistication
resource_level
primary_motivation
secondary_motivations
personal_motivations
}
... on Tool {
name
description
aliases
tool_types
tool_version
killChainPhases {
id
standard_id
entity_type
kill_chain_name
phase_name
x_opencti_order
created
modified
}
}
... on Vulnerability {
name
description
x_opencti_aliases
x_opencti_cvss_vector_string
x_opencti_cvss_base_score
x_opencti_cvss_base_severity
x_opencti_cvss_attack_vector
x_opencti_cvss_attack_complexity
x_opencti_cvss_privileges_required
x_opencti_cvss_user_interaction
x_opencti_cvss_scope
x_opencti_cvss_confidentiality_impact
x_opencti_cvss_integrity_impact
x_opencti_cvss_availability_impact
x_opencti_cvss_exploit_code_maturity
x_opencti_cvss_remediation_level
x_opencti_cvss_report_confidence
x_opencti_cvss_temporal_score
x_opencti_cvss_v2_vector_string
x_opencti_cvss_v2_base_score
x_opencti_cvss_v2_access_vector
x_opencti_cvss_v2_access_complexity
x_opencti_cvss_v2_authentication
x_opencti_cvss_v2_confidentiality_impact
x_opencti_cvss_v2_integrity_impact
x_opencti_cvss_v2_availability_impact
x_opencti_cvss_v2_exploitability
x_opencti_cvss_v2_remediation_level
x_opencti_cvss_v2_report_confidence
x_opencti_cvss_v2_temporal_score
x_opencti_cvss_v4_vector_string
x_opencti_cvss_v4_base_score
x_opencti_cvss_v4_base_severity
x_opencti_cvss_v4_attack_vector
x_opencti_cvss_v4_attack_complexity
x_opencti_cvss_v4_attack_requirements
x_opencti_cvss_v4_privileges_required
x_opencti_cvss_v4_user_interaction
x_opencti_cvss_v4_confidentiality_impact_v
x_opencti_cvss_v4_confidentiality_impact_s
x_opencti_cvss_v4_integrity_impact_v
x_opencti_cvss_v4_integrity_impact_s
x_opencti_cvss_v4_availability_impact_v
x_opencti_cvss_v4_availability_impact_s
x_opencti_cvss_v4_exploit_maturity
x_opencti_cwe
x_opencti_cisa_kev
x_opencti_epss_score
x_opencti_epss_percentile
x_opencti_score
}
... on Incident {
name
description
aliases
first_seen
last_seen
objective
}
... on Event {
name
description
}
... on Channel {
name
description
}
... on Narrative {
name
description
}
... on Language {
name
}
... on DataComponent {
name
description
}
... on DataSource {
name
description
}
... on Case {
name
}
... on StixCyberObservable {
observable_value
}
... on StixCoreRelationship {
id
standard_id
entity_type
parent_types
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
name
x_opencti_aliases
description
created
modified
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
}
}
}
revoked
confidence
created
modified
description
start_time
stop_time
}
... on StixSightingRelationship {
id
standard_id
entity_type
parent_types
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
name
x_opencti_aliases
description
created
modified
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
}
}
}
confidence
created
modified
description
attribute_count
x_opencti_negative
first_seen
last_seen
}
"""
[docs]
def read(self, **kwargs):
"""Read a StixObjectOrStixRelationship object.
:param id: the id of the StixObjectOrStixRelationship
:type id: str
:param customAttributes: custom attributes to return
:type customAttributes: str
:param filters: the filters to apply
:type filters: dict
:return: StixObjectOrStixRelationship object
:rtype: dict or None
"""
id = kwargs.get("id", None)
custom_attributes = kwargs.get("customAttributes", None)
filters = kwargs.get("filters", None)
if id is not None:
self.opencti.app_logger.info(
"Reading StixObjectOrStixRelationship", {"id": id}
)
query = (
"""
query StixObjectOrStixRelationship($id: String!) {
stixObjectOrStixRelationship(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else self.properties
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(
result["data"]["stixObjectOrStixRelationship"]
)
elif filters is not None:
result = self.list(filters=filters)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error("Missing parameters: id")
return None
[docs]
def list(self, **kwargs):
"""List StixObjectOrStixRelationship objects.
:param filters: the filters to apply
:type filters: dict
:param search: the search keyword
:type search: str
:param first: return the first n rows from the after ID (or the beginning if not set)
:type first: int
:param after: ID of the first row for pagination
:type after: str
:param getAll: whether to retrieve all results
:type getAll: bool
:param with_pagination: whether to include pagination info
:type with_pagination: bool
:param customAttributes: custom attributes to return
:type customAttributes: str
:return: List of StixObjectOrStixRelationship objects
:rtype: list
"""
filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 100)
after = kwargs.get("after", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("with_pagination", False)
custom_attributes = kwargs.get("customAttributes", None)
self.opencti.app_logger.info(
"Listing StixObjectOrStixRelationships with filters",
{"filters": json.dumps(filters)},
)
query = (
"""
query StixObjectOrStixRelationships($filters: FilterGroup, $search: String, $first: Int, $after: ID) {
stixObjectOrStixRelationships(filters: $filters, search: $search, first: $first, after: $after) {
edges {
node {
"""
+ (custom_attributes if custom_attributes is not None else self.properties)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
variables = {
"filters": filters,
"search": search,
"first": first,
"after": after,
}
result = self.opencti.query(
query,
variables,
)
if get_all:
final_data = []
data = self.opencti.process_multiple(
result["data"]["stixObjectOrStixRelationships"]
)
final_data = final_data + data
while result["data"]["stixObjectOrStixRelationships"]["pageInfo"][
"hasNextPage"
]:
after = result["data"]["stixObjectOrStixRelationships"]["pageInfo"][
"endCursor"
]
self.opencti.app_logger.debug(
"Listing stixObjectOrStixRelationships", {"after": after}
)
after_variables = {**variables, **{"after": after}}
result = self.opencti.query(query, after_variables)
data = self.opencti.process_multiple(
result["data"]["stixObjectOrStixRelationships"]
)
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["stixObjectOrStixRelationships"], with_pagination
)