# coding: utf-8
import json
import uuid
from stix2.canonicalization.Canonicalize import canonicalize
[docs]
class SecurityCoverage:
"""Main SecurityCoverage class for OpenCTI
Manages security coverage entities in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
def __init__(self, opencti):
"""Initialize the SecurityCoverage instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
external_uri
objectCovered {
__typename
... on StixCoreObject {
id
}
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
"""
@staticmethod
[docs]
def generate_id(covered_ref):
"""Generate a STIX ID for a Security Coverage.
:param covered_ref: The reference to the covered object
:type covered_ref: str
:return: STIX ID for the security coverage
:rtype: str
"""
data = {"covered_ref": covered_ref.lower().strip()}
data = canonicalize(data, utf8=False)
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
return "security-coverage--" + id
@staticmethod
[docs]
def generate_id_from_data(data):
"""Generate a STIX ID from security coverage data.
:param data: Dictionary containing 'covered_ref' key
:type data: dict
:return: STIX ID for the security coverage
:rtype: str
"""
return SecurityCoverage.generate_id(data["covered_ref"])
[docs]
def list(self, **kwargs):
"""List SecurityCoverage objects.
:param filters: the filters to apply
:param search: the search keyword
:param first: return the first n rows from the after ID (or the beginning if not set)
:param after: ID of the first row for pagination
:return: List of SecurityCoverage objects
:rtype: list
"""
filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 100)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
self.opencti.app_logger.info(
"Listing SecurityCoverage with filters", {"filters": json.dumps(filters)}
)
query = (
"""
query SecurityCoverage($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityCoverageOrdering, $orderMode: OrderingMode) {
securityCoverages(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
edges {
node {
"""
+ (custom_attributes if custom_attributes is not None else self.properties)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
if get_all:
final_data = []
data = self.opencti.process_multiple(result["data"]["securityCoverages"])
final_data = final_data + data
while result["data"]["securityCoverages"]["pageInfo"]["hasNextPage"]:
after = result["data"]["securityCoverages"]["pageInfo"]["endCursor"]
self.opencti.app_logger.info(
"Listing SecurityCoverage", {"after": after}
)
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
data = self.opencti.process_multiple(
result["data"]["securityCoverages"]
)
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["securityCoverages"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read a SecurityCoverage object.
:param id: the id of the SecurityCoverage
:type id: str
:param filters: the filters to apply if no id provided
:type filters: dict
:param customAttributes: custom attributes to return
:type customAttributes: str
:return: SecurityCoverage object
:rtype: dict or None
"""
id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
if id is not None:
self.opencti.app_logger.info("Reading SecurityCoverage", {"id": id})
query = (
"""
query SecurityCoverage($id: String!) {
securityCoverage(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else self.properties
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(
result["data"]["securityCoverage"]
)
elif filters is not None:
result = self.list(filters=filters)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_security_coverage] Missing parameters: id or filters"
)
return None
[docs]
def create(self, **kwargs):
"""Create a Security coverage object.
:param name: the name of the Security Coverage (required)
:type name: str
:param objectCovered: the covered object ID (required)
:type objectCovered: str
:param stix_id: (optional) the STIX ID
:type stix_id: str
:param description: (optional) description
:type description: str
:param createdBy: (optional) the author ID
:type createdBy: str
:param objectMarking: (optional) list of marking definition IDs
:type objectMarking: list
:param objectLabel: (optional) list of label IDs
:type objectLabel: list
:param externalReferences: (optional) list of external reference IDs
:type externalReferences: list
:param external_uri: (optional) external URI
:type external_uri: str
:param coverage_last_result: (optional) last result date
:type coverage_last_result: str
:param coverage_valid_from: (optional) valid from date
:type coverage_valid_from: str
:param coverage_valid_to: (optional) valid to date
:type coverage_valid_to: str
:param coverage_information: (optional) coverage information
:type coverage_information: list
:param auto_enrichment_disable: (optional) disable auto enrichment
:type auto_enrichment_disable: bool
:param periodicity: (optional) periodicity
:type periodicity: str
:param duration: (optional) duration
:type duration: str
:param type_affinity: (optional) type affinity
:type type_affinity: str
:param platforms_affinity: (optional) platforms affinity
:type platforms_affinity: list
:param files: (optional) list of File objects to attach
:type files: list
:param filesMarkings: (optional) list of lists of marking definition IDs for each file
:type filesMarkings: list
:return: Security Coverage object
:rtype: dict or None
"""
stix_id = kwargs.get("stix_id", None)
name = kwargs.get("name", None)
description = kwargs.get("description", None)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
object_covered = kwargs.get("objectCovered", None)
external_references = kwargs.get("externalReferences", None)
external_uri = kwargs.get("external_uri", None)
coverage_last_result = kwargs.get("coverage_last_result", None)
coverage_valid_from = kwargs.get("coverage_valid_from", None)
coverage_valid_to = kwargs.get("coverage_valid_to", None)
coverage_information = kwargs.get("coverage_information", None)
auto_enrichment_disable = kwargs.get("auto_enrichment_disable", None)
periodicity = kwargs.get("periodicity", None)
duration = kwargs.get("duration", None)
type_affinity = kwargs.get("type_affinity", None)
platforms_affinity = kwargs.get("platforms_affinity", None)
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
files = kwargs.get("files", None)
files_markings = kwargs.get("filesMarkings", None)
no_trigger_import = kwargs.get("noTriggerImport", None)
embedded = kwargs.get("embedded", None)
if name is not None and object_covered is not None:
self.opencti.app_logger.info("Creating Security Coverage", {"name": name})
query = """
mutation SecurityCoverageAdd($input: SecurityCoverageAddInput!) {
securityCoverageAdd(input: $input) {
id
standard_id
entity_type
parent_types
}
}
"""
result = self.opencti.query(
query,
{
"input": {
"stix_id": stix_id,
"name": name,
"description": description,
"createdBy": created_by,
"objectMarking": object_marking,
"objectLabel": object_label,
"objectCovered": object_covered,
"external_uri": external_uri,
"externalReferences": external_references,
"coverage_last_result": coverage_last_result,
"coverage_valid_from": coverage_valid_from,
"coverage_valid_to": coverage_valid_to,
"coverage_information": coverage_information,
"auto_enrichment_disable": auto_enrichment_disable,
"periodicity": periodicity,
"duration": duration,
"type_affinity": type_affinity,
"platforms_affinity": platforms_affinity,
"x_opencti_stix_ids": x_opencti_stix_ids,
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
}
},
)
return self.opencti.process_multiple_fields(
result["data"]["securityCoverageAdd"]
)
else:
self.opencti.app_logger.error(
"[opencti_security_coverage] "
"Missing parameters: name or object_covered"
)
return None
[docs]
def import_from_stix2(self, **kwargs):
"""Import a Security coverage from a STIX2 object.
:param stixObject: the STIX2 Security coverage object
:type stixObject: dict
:param extras: extra parameters including created_by_id, object_marking_ids, etc.
:type extras: dict
:param update: whether to update if the entity already exists
:type update: bool
:return: Security coverage object
:rtype: dict or None
"""
stix_object = kwargs.get("stixObject", None)
extras = kwargs.get("extras", {})
if stix_object is not None:
# Search in extensions
if "x_opencti_stix_ids" not in stix_object:
stix_object["x_opencti_stix_ids"] = (
self.opencti.get_attribute_in_extension("stix_ids", stix_object)
)
if "x_opencti_granted_refs" not in stix_object:
stix_object["x_opencti_granted_refs"] = (
self.opencti.get_attribute_in_extension("granted_refs", stix_object)
)
raw_coverages = stix_object["coverage"] if "coverage" in stix_object else []
coverage_information = list(
map(
lambda cov: {
"coverage_name": cov["name"],
"coverage_score": cov["score"],
},
raw_coverages,
)
)
return self.create(
stix_id=stix_object["id"],
name=stix_object["name"],
external_uri=(
stix_object["external_uri"]
if "external_uri" in stix_object
else None
),
auto_enrichment_disable=(
stix_object["auto_enrichment_disable"]
if "auto_enrichment_disable" in stix_object
else False
),
periodicity=(
stix_object["periodicity"] if "periodicity" in stix_object else None
),
duration=(
stix_object["duration"] if "duration" in stix_object else None
),
type_affinity=(
stix_object["type_affinity"]
if "type_affinity" in stix_object
else None
),
platforms_affinity=(
stix_object["platforms_affinity"]
if "platforms_affinity" in stix_object
else None
),
coverage_last_result=(
stix_object["last_result"] if "last_result" in stix_object else None
),
coverage_valid_from=(
stix_object["valid_from"] if "valid_from" in stix_object else None
),
coverage_valid_to=(
stix_object["valid_to"] if "valid_to" in stix_object else None
),
coverage_information=coverage_information,
description=(
self.opencti.stix2.convert_markdown(stix_object["description"])
if "description" in stix_object
else None
),
createdBy=(
extras["created_by_id"] if "created_by_id" in extras else None
),
objectMarking=(
extras["object_marking_ids"]
if "object_marking_ids" in extras
else None
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
objectCovered=(
stix_object["covered_ref"] if "covered_ref" in stix_object else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
x_opencti_stix_ids=(
stix_object["x_opencti_stix_ids"]
if "x_opencti_stix_ids" in stix_object
else None
),
files=extras.get("files"),
filesMarkings=extras.get("filesMarkings"),
noTriggerImport=extras.get("noTriggerImport", None),
embedded=extras.get("embedded", None),
)
else:
self.opencti.app_logger.error(
"[opencti_security_coverage] Missing parameters: stixObject"
)
return None