# coding: utf-8
import json
import uuid
from stix2.canonicalization.Canonicalize import canonicalize
[docs]
class MarkingDefinition:
"""Main MarkingDefinition class for OpenCTI
Manages marking definitions (TLP, statements) in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the MarkingDefinition instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
id
standard_id
entity_type
parent_types
definition_type
definition
x_opencti_order
x_opencti_color
created
modified
created_at
updated_at
"""
@staticmethod
[docs]
def generate_id(definition_type, definition):
"""Generate a STIX ID for a Marking Definition.
:param definition_type: The type of marking (TLP, statement, etc.)
:type definition_type: str
:param definition: The definition value
:type definition: str
:return: STIX ID for the marking definition
:rtype: str
"""
# Handle static IDs from OpenCTI
if definition_type == "TLP":
if definition == "TLP:CLEAR" or definition == "TLP:WHITE":
return "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
if definition == "TLP:GREEN":
return "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da"
if definition == "TLP:AMBER":
return "marking-definition--f88d31f6-486f-44da-b317-01333bde0b82"
if definition == "TLP:AMBER+STRICT":
return "marking-definition--826578e1-40ad-459f-bc73-ede076f81f37"
if definition == "TLP:RED":
return "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"
# Generate IDs
data = {"definition_type": definition_type, "definition": definition}
data = canonicalize(data, utf8=False)
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
return "marking-definition--" + id
@staticmethod
[docs]
def generate_id_from_data(data):
"""Generate a STIX ID from marking definition data.
:param data: Dictionary containing 'definition_type' and 'definition' keys
:type data: dict
:return: STIX ID for the marking definition
:rtype: str
"""
return MarkingDefinition.generate_id(
data["definition_type"], data["definition"]
)
[docs]
def list(self, **kwargs):
"""List Marking-Definition objects.
:param filters: the filters to apply
:type filters: dict
:param first: return the first n rows from the after ID (or the beginning if not set)
:type first: int
:param after: ID of the first row for pagination
:type after: str
:param orderBy: field to order results by
:type orderBy: str
:param orderMode: ordering mode (asc/desc)
:type orderMode: str
:param customAttributes: custom attributes to return
:type customAttributes: list
:param withPagination: whether to include pagination info
:type withPagination: bool
:return: List of Marking-Definition objects
:rtype: list
"""
filters = kwargs.get("filters", None)
first = kwargs.get("first", 500)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
with_pagination = kwargs.get("withPagination", False)
self.opencti.app_logger.info(
"Listing Marking-Definitions with filters", {"filters": json.dumps(filters)}
)
query = (
"""
query MarkingDefinitions($filters: FilterGroup, $first: Int, $after: ID, $orderBy: MarkingDefinitionsOrdering, $orderMode: OrderingMode) {
markingDefinitions(filters: $filters, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
edges {
node {
"""
+ (custom_attributes if custom_attributes is not None else self.properties)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"filters": filters,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
return self.opencti.process_multiple(
result["data"]["markingDefinitions"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read a Marking-Definition object.
:param id: the id of the Marking-Definition
:type id: str
:param filters: the filters to apply if no id provided
:type filters: dict
:return: Marking-Definition object
:rtype: dict or None
"""
id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
if id is not None:
self.opencti.app_logger.info("Reading Marking-Definition", {"id": id})
query = (
"""
query MarkingDefinition($id: String!) {
markingDefinition(id: $id) {
"""
+ self.properties
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(
result["data"]["markingDefinition"]
)
elif filters is not None:
result = self.list(filters=filters)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_marking_definition] Missing parameters: id or filters"
)
return None
[docs]
def create(self, **kwargs):
"""Create a Marking-Definition object.
:param stix_id: (optional) the STIX ID
:type stix_id: str
:param created: (optional) creation date
:type created: datetime
:param modified: (optional) modification date
:type modified: datetime
:param definition_type: the definition type (required)
:type definition_type: str
:param definition: the definition value (required)
:type definition: str
:param x_opencti_order: (optional) order (default: 0)
:type x_opencti_order: int
:param x_opencti_color: (optional) color
:type x_opencti_color: str
:param x_opencti_stix_ids: (optional) list of additional STIX IDs
:type x_opencti_stix_ids: list
:param update: (optional) whether to update if exists (default: False)
:type update: bool
:return: Marking-Definition object
:rtype: dict or None
"""
stix_id = kwargs.get("stix_id", None)
created = kwargs.get("created", None)
modified = kwargs.get("modified", None)
definition_type = kwargs.get("definition_type", None)
definition = kwargs.get("definition", None)
x_opencti_order = kwargs.get("x_opencti_order", 0)
x_opencti_color = kwargs.get("x_opencti_color", None)
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
update = kwargs.get("update", False)
if definition is not None and definition_type is not None:
query = (
"""
mutation MarkingDefinitionAdd($input: MarkingDefinitionAddInput!) {
markingDefinitionAdd(input: $input) {
"""
+ self.properties
+ """
}
}
"""
)
result = self.opencti.query(
query,
{
"input": {
"definition_type": definition_type,
"definition": definition,
"x_opencti_order": x_opencti_order,
"x_opencti_color": x_opencti_color,
"stix_id": stix_id,
"created": created,
"modified": modified,
"x_opencti_stix_ids": x_opencti_stix_ids,
"update": update,
}
},
)
return self.opencti.process_multiple_fields(
result["data"]["markingDefinitionAdd"]
)
else:
self.opencti.app_logger.error(
"[opencti_marking_definition] Missing parameters: definition and definition_type",
)
return None
[docs]
def update_field(self, **kwargs):
"""Update a Marking Definition object field.
:param id: the Marking Definition id
:type id: str
:param input: the input of the field
:type input: list
:return: The updated Marking Definition object
:rtype: dict or None
"""
id = kwargs.get("id", None)
input = kwargs.get("input", None)
if id is not None and input is not None:
self.opencti.app_logger.info("Updating Marking Definition", {"id": id})
query = """
mutation MarkingDefinitionEdit($id: ID!, $input: [EditInput]!) {
markingDefinitionEdit(id: $id) {
fieldPatch(input: $input) {
id
standard_id
entity_type
}
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"input": input,
},
)
return self.opencti.process_multiple_fields(
result["data"]["markingDefinitionEdit"]["fieldPatch"]
)
else:
self.opencti.app_logger.error(
"[opencti_marking_definition] Missing parameters: id and input"
)
return None
[docs]
def import_from_stix2(self, **kwargs):
"""Import a Marking Definition object from a STIX2 object.
:param stixObject: the Stix-Object Marking Definition
:type stixObject: dict
:param update: set the update flag on import
:type update: bool
:return: Marking Definition object
:rtype: dict or None
"""
stix_object = kwargs.get("stixObject", None)
update = kwargs.get("update", False)
if stix_object is not None:
if (
"x_opencti_definition_type" in stix_object
and "x_opencti_definition" in stix_object
):
definition_type = stix_object["x_opencti_definition_type"]
definition = stix_object["x_opencti_definition"]
elif "definition_type" in stix_object:
definition_type = stix_object["definition_type"]
definition = None
if stix_object["definition_type"] == "tlp":
definition_type = definition_type.upper()
if "definition" in stix_object:
definition = (
definition_type
+ ":"
+ stix_object["definition"]["tlp"].upper()
)
elif "name" in stix_object:
definition = stix_object["name"]
else:
if "definition" in stix_object:
if isinstance(stix_object["definition"], str):
definition = stix_object["definition"]
elif (
isinstance(stix_object["definition"], dict)
and stix_object["definition_type"]
in stix_object["definition"]
):
definition = stix_object["definition"][
stix_object["definition_type"]
]
else:
definition = stix_object["name"]
elif "name" in stix_object:
definition = stix_object["name"]
elif "name" in stix_object:
if ":" in stix_object["name"]:
definition_type = stix_object["name"].split(":")[0]
else:
definition_type = "statement"
definition = stix_object["name"]
else:
return None
# Replace TLP:WHITE
if definition == "TLP:WHITE":
definition = "TLP:CLEAR"
# Search in extensions
if (
"x_opencti_order" not in stix_object
and self.opencti.get_attribute_in_extension("order", stix_object)
is not None
):
stix_object["x_opencti_order"] = (
self.opencti.get_attribute_in_extension("order", stix_object)
)
if "x_opencti_color" not in stix_object:
stix_object["x_opencti_color"] = (
self.opencti.get_attribute_in_extension("color", stix_object)
)
if "x_opencti_stix_ids" not in stix_object:
stix_object["x_opencti_stix_ids"] = (
self.opencti.get_attribute_in_extension("stix_ids", stix_object)
)
return self.create(
stix_id=stix_object["id"],
created=stix_object["created"] if "created" in stix_object else None,
modified=stix_object["modified"] if "modified" in stix_object else None,
definition_type=definition_type,
definition=definition,
x_opencti_order=(
stix_object["x_opencti_order"]
if "x_opencti_order" in stix_object
else 0
),
x_opencti_color=(
stix_object["x_opencti_color"]
if "x_opencti_color" in stix_object
else None
),
x_opencti_stix_ids=(
stix_object["x_opencti_stix_ids"]
if "x_opencti_stix_ids" in stix_object
else None
),
update=update,
)
else:
self.opencti.app_logger.error(
"[opencti_marking_definition] Missing parameters: stixObject"
)
return None
[docs]
def delete(self, **kwargs):
"""Delete a Marking-Definition object.
:param id: the id of the Marking-Definition to delete
:type id: str
:return: None
"""
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info("Deleting Marking-Definition", {"id": id})
query = """
mutation MarkingDefinitionEdit($id: ID!) {
markingDefinitionEdit(id: $id) {
delete
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error(
"[opencti_marking_definition] Missing parameters: id"
)
return None