# coding: utf-8
import datetime
import uuid
from stix2.canonicalization.Canonicalize import canonicalize
[docs]
class StixCoreRelationship:
"""Main StixCoreRelationship class for OpenCTI
Manages STIX relationships between entities in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the StixCoreRelationship instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
id
entity_type
parent_types
spec_version
created_at
updated_at
standard_id
relationship_type
description
start_time
stop_time
revoked
confidence
lang
created
modified
status {
id
template {
id
name
color
}
}
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectOrganization {
id
standard_id
name
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
from {
... on BasicObject {
id
entity_type
parent_types
}
... on BasicRelationship {
id
entity_type
parent_types
}
... on StixObject {
standard_id
spec_version
created_at
updated_at
}
... on AttackPattern {
name
}
... on Campaign {
name
}
... on CourseOfAction {
name
}
... on Individual {
name
}
... on Organization {
name
}
... on Sector {
name
}
... on System {
name
}
... on Indicator {
name
}
... on Infrastructure {
name
}
... on IntrusionSet {
name
}
... on Position {
name
}
... on City {
name
}
... on Country {
name
}
... on Region {
name
}
... on Malware {
name
}
... on ThreatActor {
name
}
... on Tool {
name
}
... on Vulnerability {
name
}
... on Incident {
name
}
... on Event {
name
description
}
... on Channel {
name
description
}
... on Narrative {
name
description
}
... on Language {
name
}
... on DataComponent {
name
description
}
... on DataSource {
name
description
}
... on Case {
name
}
... on StixCyberObservable {
observable_value
}
... on StixCoreRelationship {
standard_id
spec_version
created_at
updated_at
}
}
to {
... on BasicObject {
id
entity_type
parent_types
}
... on BasicRelationship {
id
entity_type
parent_types
}
... on StixObject {
standard_id
spec_version
created_at
updated_at
}
... on AttackPattern {
name
}
... on Campaign {
name
}
... on CourseOfAction {
name
}
... on Individual {
name
}
... on Organization {
name
}
... on Sector {
name
}
... on System {
name
}
... on Indicator {
name
}
... on Infrastructure {
name
}
... on IntrusionSet {
name
}
... on Position {
name
}
... on City {
name
}
... on Country {
name
}
... on Region {
name
}
... on Malware {
name
}
... on ThreatActor {
name
}
... on Tool {
name
}
... on Vulnerability {
name
}
... on Incident {
name
}
... on Event {
name
description
}
... on Channel {
name
description
}
... on Narrative {
name
description
}
... on Language {
name
}
... on DataComponent {
name
description
}
... on DataSource {
name
description
}
... on Case {
name
}
... on StixCyberObservable {
observable_value
}
... on StixCoreRelationship {
standard_id
spec_version
created_at
updated_at
}
}
"""
@staticmethod
[docs]
def generate_id(
relationship_type, source_ref, target_ref, start_time=None, stop_time=None
):
"""Generate a STIX ID for a relationship.
:param relationship_type: The type of relationship
:type relationship_type: str
:param source_ref: The source entity reference ID
:type source_ref: str
:param target_ref: The target entity reference ID
:type target_ref: str
:param start_time: (optional) The start time of the relationship
:type start_time: str or datetime.datetime or None
:param stop_time: (optional) The stop time of the relationship
:type stop_time: str or datetime.datetime or None
:return: STIX ID for the relationship
:rtype: str
"""
if isinstance(start_time, datetime.datetime):
start_time = start_time.isoformat()
if isinstance(stop_time, datetime.datetime):
stop_time = stop_time.isoformat()
if start_time is not None and stop_time is not None:
data = {
"relationship_type": relationship_type,
"source_ref": source_ref,
"target_ref": target_ref,
"start_time": start_time,
"stop_time": stop_time,
}
elif start_time is not None:
data = {
"relationship_type": relationship_type,
"source_ref": source_ref,
"target_ref": target_ref,
"start_time": start_time,
}
else:
data = {
"relationship_type": relationship_type,
"source_ref": source_ref,
"target_ref": target_ref,
}
data = canonicalize(data, utf8=False)
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
return "relationship--" + id
@staticmethod
[docs]
def generate_id_from_data(data):
"""Generate a STIX ID from relationship data.
:param data: Dictionary containing relationship_type, source_ref, target_ref, and optionally start_time/stop_time
:type data: dict
:return: STIX ID for the relationship
:rtype: str
"""
return StixCoreRelationship.generate_id(
data["relationship_type"],
data["source_ref"],
data["target_ref"],
data.get("start_time"),
data.get("stop_time"),
)
[docs]
def list(self, **kwargs):
"""List stix_core_relationship objects.
:param fromOrToId: the id of an entity (source or target)
:type fromOrToId: str
:param elementWithTargetTypes: filter by target types
:type elementWithTargetTypes: list
:param fromId: the id of the source entity of the relation
:type fromId: str
:param fromTypes: filter by source entity types
:type fromTypes: list
:param toId: the id of the target entity of the relation
:type toId: str
:param toTypes: filter by target entity types
:type toTypes: list
:param relationship_type: the relation type
:type relationship_type: str
:param startTimeStart: the start_time date start filter
:type startTimeStart: str
:param startTimeStop: the start_time date stop filter
:type startTimeStop: str
:param stopTimeStart: the stop_time date start filter
:type stopTimeStart: str
:param stopTimeStop: the stop_time date stop filter
:type stopTimeStop: str
:param filters: additional filters to apply
:type filters: dict
:param first: return the first n rows from the after ID (or the beginning if not set)
:type first: int
:param after: ID of the first row for pagination
:type after: str
:param orderBy: field to order results by
:type orderBy: str
:param orderMode: ordering mode (asc/desc)
:type orderMode: str
:param customAttributes: custom attributes to return
:type customAttributes: str
:param getAll: whether to retrieve all results
:type getAll: bool
:param withPagination: whether to include pagination info
:type withPagination: bool
:param search: search keyword
:type search: str
:return: List of stix_core_relationship objects
:rtype: list
"""
from_or_to_id = kwargs.get("fromOrToId", None)
element_with_target_types = kwargs.get("elementWithTargetTypes", None)
from_id = kwargs.get("fromId", None)
from_types = kwargs.get("fromTypes", None)
to_id = kwargs.get("toId", None)
to_types = kwargs.get("toTypes", None)
relationship_type = kwargs.get("relationship_type", None)
start_time_start = kwargs.get("startTimeStart", None)
start_time_stop = kwargs.get("startTimeStop", None)
stop_time_start = kwargs.get("stopTimeStart", None)
stop_time_stop = kwargs.get("stopTimeStop", None)
filters = kwargs.get("filters", None)
first = kwargs.get("first", 100)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
search = kwargs.get("search", None)
self.opencti.app_logger.info(
"Listing stix_core_relationships",
{
"relationship_type": relationship_type,
"from_or_to_id": from_or_to_id,
"from_id": from_id,
"to_id": to_id,
"element_with_target_types": element_with_target_types,
"from_types": from_types,
"to_types": to_types,
"search": search,
},
)
query = (
"""
query StixCoreRelationships($fromOrToId: [String], $elementWithTargetTypes: [String], $fromId: [String], $fromTypes: [String], $toId: [String], $toTypes: [String], $relationship_type: [String], $startTimeStart: DateTime, $startTimeStop: DateTime, $stopTimeStart: DateTime, $stopTimeStop: DateTime, $filters: FilterGroup, $first: Int, $after: ID, $orderBy: StixCoreRelationshipsOrdering, $orderMode: OrderingMode, $search: String) {
stixCoreRelationships(fromOrToId: $fromOrToId, elementWithTargetTypes: $elementWithTargetTypes, fromId: $fromId, fromTypes: $fromTypes, toId: $toId, toTypes: $toTypes, relationship_type: $relationship_type, startTimeStart: $startTimeStart, startTimeStop: $startTimeStop, stopTimeStart: $stopTimeStart, stopTimeStop: $stopTimeStop, filters: $filters, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, search: $search) {
edges {
node {
"""
+ (custom_attributes if custom_attributes is not None else self.properties)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"fromOrToId": from_or_to_id,
"elementWithTargetTypes": element_with_target_types,
"fromId": from_id,
"fromTypes": from_types,
"toId": to_id,
"toTypes": to_types,
"relationship_type": relationship_type,
"startTimeStart": start_time_start,
"startTimeStop": start_time_stop,
"stopTimeStart": stop_time_start,
"stopTimeStop": stop_time_stop,
"filters": filters,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
"search": search,
},
)
if get_all:
final_data = []
data = self.opencti.process_multiple(
result["data"]["stixCoreRelationships"]
)
final_data = final_data + data
while result["data"]["stixCoreRelationships"]["pageInfo"]["hasNextPage"]:
after = result["data"]["stixCoreRelationships"]["pageInfo"]["endCursor"]
self.opencti.app_logger.debug(
"Listing StixCoreRelationships", {"after": after}
)
result = self.opencti.query(
query,
{
"fromOrToId": from_or_to_id,
"elementWithTargetTypes": element_with_target_types,
"fromId": from_id,
"fromTypes": from_types,
"toId": to_id,
"toTypes": to_types,
"relationship_type": relationship_type,
"startTimeStart": start_time_start,
"startTimeStop": start_time_stop,
"stopTimeStart": stop_time_start,
"stopTimeStop": stop_time_stop,
"filters": filters,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
"search": search,
},
)
data = self.opencti.process_multiple(
result["data"]["stixCoreRelationships"]
)
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["stixCoreRelationships"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read a stix_core_relationship object.
:param id: the id of the stix_core_relationship
:type id: str
:param fromOrToId: the id of an entity (source or target)
:type fromOrToId: str
:param fromId: the id of the source entity of the relation
:type fromId: str
:param toId: the id of the target entity of the relation
:type toId: str
:param relationship_type: the relation type
:type relationship_type: str
:param startTimeStart: the start_time date start filter
:type startTimeStart: str
:param startTimeStop: the start_time date stop filter
:type startTimeStop: str
:param stopTimeStart: the stop_time date start filter
:type stopTimeStart: str
:param stopTimeStop: the stop_time date stop filter
:type stopTimeStop: str
:param filters: filters to apply
:type filters: dict
:param customAttributes: custom attributes to return
:type customAttributes: str
:return: stix_core_relationship object
:rtype: dict or None
"""
id = kwargs.get("id", None)
from_or_to_id = kwargs.get("fromOrToId", None)
from_id = kwargs.get("fromId", None)
to_id = kwargs.get("toId", None)
relationship_type = kwargs.get("relationship_type", None)
start_time_start = kwargs.get("startTimeStart", None)
start_time_stop = kwargs.get("startTimeStop", None)
stop_time_start = kwargs.get("stopTimeStart", None)
stop_time_stop = kwargs.get("stopTimeStop", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
if id is not None:
self.opencti.app_logger.info("Reading stix_core_relationship", {"id": id})
query = (
"""
query StixCoreRelationship($id: String!) {
stixCoreRelationship(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else self.properties
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(
result["data"]["stixCoreRelationship"]
)
elif filters is not None:
result = self.list(filters=filters, customAttributes=custom_attributes)
if len(result) > 0:
return result[0]
else:
return None
elif from_id is not None and to_id is not None:
result = self.list(
fromOrToId=from_or_to_id,
fromId=from_id,
toId=to_id,
relationship_type=relationship_type,
startTimeStart=start_time_start,
startTimeStop=start_time_stop,
stopTimeStart=stop_time_start,
stopTimeStop=stop_time_stop,
)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error("Missing parameters: id or from_id and to_id")
return None
[docs]
def create(self, **kwargs):
"""Create a stix_core_relationship object.
:param fromId: the id of the source entity
:type fromId: str
:param toId: the id of the target entity
:type toId: str
:param stix_id: (optional) the STIX ID
:type stix_id: str
:param relationship_type: the type of relationship
:type relationship_type: str
:param description: (optional) description
:type description: str
:param start_time: (optional) start time of the relationship
:type start_time: str
:param stop_time: (optional) stop time of the relationship
:type stop_time: str
:param revoked: (optional) whether the relationship is revoked
:type revoked: bool
:param confidence: (optional) confidence level (0-100)
:type confidence: int
:param lang: (optional) language
:type lang: str
:param created: (optional) creation date
:type created: str
:param modified: (optional) modification date
:type modified: str
:param createdBy: (optional) the author ID
:type createdBy: str
:param objectMarking: (optional) list of marking definition IDs
:type objectMarking: list
:param objectLabel: (optional) list of label IDs
:type objectLabel: list
:param externalReferences: (optional) list of external reference IDs
:type externalReferences: list
:param killChainPhases: (optional) list of kill chain phase IDs
:type killChainPhases: list
:param objectOrganization: (optional) list of organization IDs
:type objectOrganization: list
:param x_opencti_workflow_id: (optional) workflow ID
:type x_opencti_workflow_id: str
:param x_opencti_stix_ids: (optional) list of additional STIX IDs
:type x_opencti_stix_ids: list
:param x_opencti_modified_at: (optional) custom modification date
:type x_opencti_modified_at: str
:param coverage_information: (optional) coverage information
:type coverage_information: list
:param update: (optional) whether to update if exists (default: False)
:type update: bool
:return: stix_core_relationship object
:rtype: dict or None
"""
from_id = kwargs.get("fromId", None)
to_id = kwargs.get("toId", None)
stix_id = kwargs.get("stix_id", None)
relationship_type = kwargs.get("relationship_type", None)
description = kwargs.get("description", None)
start_time = kwargs.get("start_time", None)
stop_time = kwargs.get("stop_time", None)
revoked = kwargs.get("revoked", None)
confidence = kwargs.get("confidence", None)
lang = kwargs.get("lang", None)
created = kwargs.get("created", None)
modified = kwargs.get("modified", None)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
external_references = kwargs.get("externalReferences", None)
kill_chain_phases = kwargs.get("killChainPhases", None)
granted_refs = kwargs.get("objectOrganization", None)
x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None)
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
x_opencti_modified_at = kwargs.get("x_opencti_modified_at", None)
coverage_information = kwargs.get("coverage_information", None)
update = kwargs.get("update", False)
upsert_operations = kwargs.get("upsert_operations", None)
self.opencti.app_logger.info(
"Creating stix_core_relationship",
{
"relationship_type": relationship_type,
"from_id": from_id,
"to_id": to_id,
},
)
query = """
mutation StixCoreRelationshipAdd($input: StixCoreRelationshipAddInput!) {
stixCoreRelationshipAdd(input: $input) {
id
standard_id
entity_type
parent_types
}
}
"""
result = self.opencti.query(
query,
{
"input": {
"fromId": from_id,
"toId": to_id,
"stix_id": stix_id,
"relationship_type": relationship_type,
"description": description,
"start_time": start_time,
"stop_time": stop_time,
"revoked": revoked,
"confidence": confidence,
"lang": lang,
"created": created,
"modified": modified,
"createdBy": created_by,
"objectMarking": object_marking,
"objectLabel": object_label,
"objectOrganization": granted_refs,
"externalReferences": external_references,
"killChainPhases": kill_chain_phases,
"x_opencti_workflow_id": x_opencti_workflow_id,
"x_opencti_stix_ids": x_opencti_stix_ids,
"x_opencti_modified_at": x_opencti_modified_at,
"coverage_information": coverage_information,
"update": update,
"upsertOperations": upsert_operations,
}
},
)
return self.opencti.process_multiple_fields(
result["data"]["stixCoreRelationshipAdd"]
)
[docs]
def update_field(self, **kwargs):
"""Update a stix_core_relationship object field.
:param id: the stix_core_relationship id
:type id: str
:param input: the input of the field
:type input: list
:return: The updated stix_core_relationship object
:rtype: dict or None
"""
id = kwargs.get("id", None)
input = kwargs.get("input", None)
if id is not None and input is not None:
self.opencti.app_logger.info("Updating stix_core_relationship", {"id": id})
query = """
mutation StixCoreRelationshipEdit($id: ID!, $input: [EditInput]!) {
stixCoreRelationshipEdit(id: $id) {
fieldPatch(input: $input) {
id
standard_id
entity_type
}
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"input": input,
},
)
return self.opencti.process_multiple_fields(
result["data"]["stixCoreRelationshipEdit"]["fieldPatch"]
)
else:
self.opencti.app_logger.error(
"[opencti_stix_core_relationship] Missing parameters: id and input",
)
return None
[docs]
def delete(self, **kwargs):
"""Delete a stix_core_relationship.
:param id: the stix_core_relationship id
:type id: str
:return: None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info("Deleting stix_core_relationship", {"id": id})
query = """
mutation StixCoreRelationshipEdit($id: ID!) {
stixCoreRelationshipEdit(id: $id) {
delete
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error(
"[opencti_stix_core_relationship] Missing parameters: id"
)
return None
[docs]
def add_marking_definition(self, **kwargs):
"""Add a Marking-Definition object to stix_core_relationship object (object_marking_refs).
:param id: the id of the stix_core_relationship
:type id: str
:param marking_definition_id: the id of the Marking-Definition
:type marking_definition_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
marking_definition_id = kwargs.get("marking_definition_id", None)
if id is not None and marking_definition_id is not None:
custom_attributes = """
id
objectMarking {
id
standard_id
entity_type
definition_type
definition
x_opencti_order
x_opencti_color
created
modified
}
"""
stix_core_relationship = self.read(
id=id, customAttributes=custom_attributes
)
if stix_core_relationship is None:
self.opencti.app_logger.error(
"Cannot add Marking-Definition, entity not found"
)
return False
if marking_definition_id in stix_core_relationship["objectMarkingIds"]:
return True
else:
self.opencti.app_logger.info(
"Adding Marking-Definition to Stix-Domain-Object",
{"id": id, "marking_definition_id": marking_definition_id},
)
query = """
mutation StixCoreRelationshipAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCoreRelationshipEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": marking_definition_id,
"relationship_type": "object-marking",
},
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and marking_definition_id"
)
return False
[docs]
def remove_marking_definition(self, **kwargs):
"""Remove a Marking-Definition object from stix_core_relationship.
:param id: the id of the stix_core_relationship
:type id: str
:param marking_definition_id: the id of the Marking-Definition
:type marking_definition_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
marking_definition_id = kwargs.get("marking_definition_id", None)
if id is not None and marking_definition_id is not None:
self.opencti.app_logger.info(
"Removing Marking-Definition from stix_core_relationship",
{"id": id, "marking_definition_id": marking_definition_id},
)
query = """
mutation StixCoreRelationshipRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCoreRelationshipEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": marking_definition_id,
"relationship_type": "object-marking",
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and marking_definition_id"
)
return False
[docs]
def add_label(self, **kwargs):
"""Add a Label object to stix_core_relationship (labeling).
:param id: the id of the stix_core_relationship
:type id: str
:param label_id: the id of the Label
:type label_id: str
:param label_name: (optional) the name of the Label (will create if not exists)
:type label_name: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
label_id = kwargs.get("label_id", None)
label_name = kwargs.get("label_name", None)
if label_name is not None:
label = self.opencti.label.read(
filters={
"mode": "and",
"filters": [{"key": "value", "values": [label_name]}],
"filterGroups": [],
}
)
if label:
label_id = label["id"]
else:
label = self.opencti.label.create(value=label_name)
label_id = label["id"]
if id is not None and label_id is not None:
self.opencti.app_logger.info(
"Adding label to stix-core-relationship",
{"label_id": label_id, "id": id},
)
query = """
mutation StixCoreRelationshipAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCoreRelationshipEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": label_id,
"relationship_type": "object-label",
},
},
)
return True
else:
self.opencti.app_logger.error("Missing parameters: id and label_id")
return False
[docs]
def remove_label(self, **kwargs):
"""Remove a Label object from stix_core_relationship.
:param id: the id of the stix_core_relationship
:type id: str
:param label_id: the id of the Label
:type label_id: str
:param label_name: (optional) the name of the Label
:type label_name: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
label_id = kwargs.get("label_id", None)
label_name = kwargs.get("label_name", None)
if label_name is not None:
label = self.opencti.label.read(
filters={
"mode": "and",
"filters": [{"key": "value", "values": [label_name]}],
"filterGroups": [],
}
)
if label:
label_id = label["id"]
if id is not None and label_id is not None:
self.opencti.app_logger.info(
"Removing label from stix_core_relationship",
{"label_id": label_id, "id": id},
)
query = """
mutation StixCoreRelationshipRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCoreRelationshipEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": label_id,
"relationship_type": "object-label",
},
)
return True
else:
self.opencti.app_logger.error("Missing parameters: id and label_id")
return False
[docs]
def add_external_reference(self, **kwargs):
"""Add an External-Reference object to stix_core_relationship.
:param id: the id of the stix_core_relationship
:type id: str
:param external_reference_id: the id of the External-Reference
:type external_reference_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
external_reference_id = kwargs.get("external_reference_id", None)
if id is not None and external_reference_id is not None:
self.opencti.app_logger.info(
"Adding External-Reference to stix-core-relationship",
{"external_reference_id": external_reference_id, "id": id},
)
query = """
mutation StixCoreRelationshipEditRelationAdd($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCoreRelationshipEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": external_reference_id,
"relationship_type": "external-reference",
},
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and external_reference_id"
)
return False
[docs]
def remove_external_reference(self, **kwargs):
"""Remove an External-Reference object from stix_core_relationship.
:param id: the id of the stix_core_relationship
:type id: str
:param external_reference_id: the id of the External-Reference
:type external_reference_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
external_reference_id = kwargs.get("external_reference_id", None)
if id is not None and external_reference_id is not None:
self.opencti.app_logger.info(
"Removing External-Reference from stix_core_relationship",
{"external_reference_id": external_reference_id, "id": id},
)
query = """
mutation StixCoreRelationshipRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCoreRelationshipEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": external_reference_id,
"relationship_type": "external-reference",
},
)
return True
else:
self.opencti.app_logger.error(
"Missing parameters: id and external_reference_id"
)
return False
[docs]
def add_kill_chain_phase(self, **kwargs):
"""Add a Kill-Chain-Phase object to stix_core_relationship object (kill_chain_phases).
:param id: the id of the stix_core_relationship
:type id: str
:param kill_chain_phase_id: the id of the Kill-Chain-Phase
:type kill_chain_phase_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
kill_chain_phase_id = kwargs.get("kill_chain_phase_id", None)
if id is not None and kill_chain_phase_id is not None:
self.opencti.app_logger.info(
"Adding Kill-Chain-Phase to stix-core-relationship",
{"kill_chain_phase_id": kill_chain_phase_id, "id": id},
)
query = """
mutation StixCoreRelationshipAddRelation($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCoreRelationshipEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"input": {
"toId": kill_chain_phase_id,
"relationship_type": "kill-chain-phase",
},
},
)
return True
else:
self.opencti.app_logger.error(
"[opencti_stix_core_relationship] Missing parameters: id and kill_chain_phase_id",
)
return False
[docs]
def remove_kill_chain_phase(self, **kwargs):
"""Remove a Kill-Chain-Phase object from stix_core_relationship.
:param id: the id of the stix_core_relationship
:type id: str
:param kill_chain_phase_id: the id of the Kill-Chain-Phase
:type kill_chain_phase_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
kill_chain_phase_id = kwargs.get("kill_chain_phase_id", None)
if id is not None and kill_chain_phase_id is not None:
self.opencti.app_logger.info(
"Removing Kill-Chain-Phase from stix_core_relationship",
{"kill_chain_phase_id": kill_chain_phase_id, "id": id},
)
query = """
mutation StixCoreRelationshipRemoveRelation($id: ID!, $toId: StixRef!, $relationship_type: String!) {
stixCoreRelationshipEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": kill_chain_phase_id,
"relationship_type": "kill-chain-phase",
},
)
return True
else:
self.opencti.app_logger.error(
"[stix_core_relationship] Missing parameters: id and kill_chain_phase_id"
)
return False
[docs]
def update_created_by(self, **kwargs):
"""Update the Identity author of a stix_core_relationship (created_by).
:param id: the id of the stix_core_relationship
:type id: str
:param identity_id: the id of the Identity
:type identity_id: str
:return: True if successful, False otherwise
:rtype: bool
"""
id = kwargs.get("id", None)
identity_id = kwargs.get("identity_id", None)
if id is not None:
self.opencti.app_logger.info(
"Updating author of stix_core_relationship with Identity",
{"id": id, "identity_id": identity_id},
)
custom_attributes = """
id
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
name
x_opencti_aliases
description
created
modified
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
"""
stix_domain_object = self.read(id=id, customAttributes=custom_attributes)
if stix_domain_object["createdBy"] is not None:
query = """
mutation StixCoreRelationshipEdit($id: ID!, $toId: StixRef! $relationship_type: String!) {
stixCoreRelationshipEdit(id: $id) {
relationDelete(toId: $toId, relationship_type: $relationship_type) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": id,
"toId": stix_domain_object["createdBy"]["id"],
"relationship_type": "created-by",
},
)
if identity_id is not None:
# Add the new relation
query = """
mutation StixCoreRelationshipEdit($id: ID!, $input: StixRefRelationshipAddInput!) {
stixCoreRelationshipEdit(id: $id) {
relationAdd(input: $input) {
id
}
}
}
"""
variables = {
"id": id,
"input": {
"toId": identity_id,
"relationship_type": "created-by",
},
}
self.opencti.query(query, variables)
else:
self.opencti.app_logger.error("Missing parameters: id")
return False
[docs]
def import_from_stix2(self, **kwargs):
"""Import a stix_core_relationship from a STIX2 object.
:param stixRelation: the STIX2 relationship object
:type stixRelation: dict
:param extras: extra parameters including created_by_id, object_marking_ids, etc.
:type extras: dict
:param update: whether to update if the entity already exists
:type update: bool
:param defaultDate: default date to use for start/stop times
:type defaultDate: str or bool
:return: stix_core_relationship object
:rtype: dict or None
"""
stix_relation = kwargs.get("stixRelation", None)
extras = kwargs.get("extras", {})
update = kwargs.get("update", False)
default_date = kwargs.get("defaultDate", False)
if stix_relation is not None:
# Search in extensions
if "x_opencti_stix_ids" not in stix_relation:
stix_relation["x_opencti_stix_ids"] = (
self.opencti.get_attribute_in_extension("stix_ids", stix_relation)
)
if "x_opencti_granted_refs" not in stix_relation:
stix_relation["x_opencti_granted_refs"] = (
self.opencti.get_attribute_in_extension(
"granted_refs", stix_relation
)
)
if "x_opencti_workflow_id" not in stix_relation:
stix_relation["x_opencti_workflow_id"] = (
self.opencti.get_attribute_in_extension(
"workflow_id", stix_relation
)
)
if "x_opencti_modified_at" not in stix_relation:
stix_relation["x_opencti_modified_at"] = (
self.opencti.get_attribute_in_extension(
"modified_at", stix_relation
)
)
if "opencti_upsert_operations" not in stix_relation:
stix_relation["opencti_upsert_operations"] = (
self.opencti.get_attribute_in_extension(
"opencti_upsert_operations", stix_relation
)
)
raw_coverages = (
stix_relation["coverage"] if "coverage" in stix_relation else []
)
coverage_information = list(
map(
lambda cov: {
"coverage_name": cov["name"],
"coverage_score": cov["score"],
},
raw_coverages,
)
)
source_ref = stix_relation["source_ref"]
target_ref = stix_relation["target_ref"]
return self.create(
fromId=source_ref,
toId=target_ref,
stix_id=stix_relation["id"],
relationship_type=stix_relation["relationship_type"],
description=(
self.opencti.stix2.convert_markdown(stix_relation["description"])
if "description" in stix_relation
else None
),
start_time=(
stix_relation["start_time"]
if "start_time" in stix_relation
else default_date
),
stop_time=(
stix_relation["stop_time"]
if "stop_time" in stix_relation
else default_date
),
coverage_information=coverage_information,
revoked=(
stix_relation["revoked"] if "revoked" in stix_relation else None
),
confidence=(
stix_relation["confidence"]
if "confidence" in stix_relation
else None
),
lang=stix_relation["lang"] if "lang" in stix_relation else None,
created=(
stix_relation["created"] if "created" in stix_relation else None
),
modified=(
stix_relation["modified"] if "modified" in stix_relation else None
),
createdBy=(
extras["created_by_id"] if "created_by_id" in extras else None
),
objectMarking=(
extras["object_marking_ids"]
if "object_marking_ids" in extras
else None
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
killChainPhases=(
extras["kill_chain_phases_ids"]
if "kill_chain_phases_ids" in extras
else None
),
objectOrganization=(
stix_relation["x_opencti_granted_refs"]
if "x_opencti_granted_refs" in stix_relation
else None
),
x_opencti_workflow_id=(
stix_relation["x_opencti_workflow_id"]
if "x_opencti_workflow_id" in stix_relation
else None
),
x_opencti_stix_ids=(
stix_relation["x_opencti_stix_ids"]
if "x_opencti_stix_ids" in stix_relation
else None
),
x_opencti_modified_at=(
stix_relation["x_opencti_modified_at"]
if "x_opencti_modified_at" in stix_relation
else None
),
update=update,
upsert_operations=(
stix_relation["opencti_upsert_operations"]
if "opencti_upsert_operations" in stix_relation
else None
),
)
else:
self.opencti.app_logger.error(
"[opencti_stix_core_relationship] Missing parameters: stixObject"
)
return None
[docs]
def organization_share(self, entity_id, organization_ids, sharing_direct_container):
"""Share element to multiple organizations.
:param entity_id: the stix_core_relationship id
:type entity_id: str
:param organization_ids: the organization IDs to share with
:type organization_ids: list
:param sharing_direct_container: whether to share direct container
:type sharing_direct_container: bool
:return: None
"""
query = """
mutation StixCoreRelationshipEdit($id: ID!, $organizationId: [ID!]!, $directContainerSharing: Boolean) {
stixCoreRelationshipEdit(id: $id) {
restrictionOrganizationAdd(organizationId: $organizationId, directContainerSharing: $directContainerSharing) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": entity_id,
"organizationId": organization_ids,
"directContainerSharing": sharing_direct_container,
},
)
[docs]
def organization_unshare(
self, entity_id, organization_ids, sharing_direct_container
):
"""Unshare element from multiple organizations.
:param entity_id: the stix_core_relationship id
:type entity_id: str
:param organization_ids: the organization IDs to unshare from
:type organization_ids: list
:param sharing_direct_container: whether to unshare direct container
:type sharing_direct_container: bool
:return: None
"""
query = """
mutation StixCoreRelationshipEdit($id: ID!, $organizationId: [ID!]!, $directContainerSharing: Boolean) {
stixCoreRelationshipEdit(id: $id) {
restrictionOrganizationDelete(organizationId: $organizationId, directContainerSharing: $directContainerSharing) {
id
}
}
}
"""
self.opencti.query(
query,
{
"id": entity_id,
"organizationId": organization_ids,
"directContainerSharing": sharing_direct_container,
},
)
[docs]
def remove_from_draft(self, **kwargs):
"""Remove a stix_core_relationship object from draft (revert).
:param id: the stix_core_relationship id
:type id: str
:return: None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info(
"Draft remove stix_core_relationship", {"id": id}
)
query = """
mutation StixCoreRelationshipEditDraftRemove($id: ID!) {
stixCoreRelationshipEdit(id: $id) {
removeFromDraft
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error(
"[stix_core_relationship] Cannot remove from draft, missing parameters: id"
)
return None