# coding: utf-8
import json
import uuid
from stix2.canonicalization.Canonicalize import canonicalize
[docs]
class Vulnerability:
"""Main Vulnerability class for OpenCTI
Manages vulnerability information including CVE data in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
[docs]
def __init__(self, opencti):
"""Initialize the Vulnerability instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
status {
id
template {
id
name
color
}
}
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
id
value
color
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectOrganization {
id
standard_id
name
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
objectLabel {
id
value
color
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
revoked
confidence
created
modified
name
description
x_opencti_aliases
x_opencti_cvss_vector_string
x_opencti_cvss_base_score
x_opencti_cvss_base_severity
x_opencti_cvss_attack_vector
x_opencti_cvss_attack_complexity
x_opencti_cvss_privileges_required
x_opencti_cvss_user_interaction
x_opencti_cvss_scope
x_opencti_cvss_confidentiality_impact
x_opencti_cvss_integrity_impact
x_opencti_cvss_availability_impact
x_opencti_cvss_exploit_code_maturity
x_opencti_cvss_remediation_level
x_opencti_cvss_report_confidence
x_opencti_cvss_temporal_score
x_opencti_cvss_v2_vector_string
x_opencti_cvss_v2_base_score
x_opencti_cvss_v2_access_vector
x_opencti_cvss_v2_access_complexity
x_opencti_cvss_v2_authentication
x_opencti_cvss_v2_confidentiality_impact
x_opencti_cvss_v2_integrity_impact
x_opencti_cvss_v2_availability_impact
x_opencti_cvss_v2_exploitability
x_opencti_cvss_v2_remediation_level
x_opencti_cvss_v2_report_confidence
x_opencti_cvss_v2_temporal_score
x_opencti_cvss_v4_vector_string
x_opencti_cvss_v4_base_score
x_opencti_cvss_v4_base_severity
x_opencti_cvss_v4_attack_vector
x_opencti_cvss_v4_attack_complexity
x_opencti_cvss_v4_attack_requirements
x_opencti_cvss_v4_privileges_required
x_opencti_cvss_v4_user_interaction
x_opencti_cvss_v4_confidentiality_impact_v
x_opencti_cvss_v4_confidentiality_impact_s
x_opencti_cvss_v4_integrity_impact_v
x_opencti_cvss_v4_integrity_impact_s
x_opencti_cvss_v4_availability_impact_v
x_opencti_cvss_v4_availability_impact_s
x_opencti_cvss_v4_exploit_maturity
x_opencti_cwe
x_opencti_first_seen_active
x_opencti_cisa_kev
x_opencti_epss_score
x_opencti_epss_percentile
x_opencti_score
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
objectMarking {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
}
}
}
"""
@staticmethod
[docs]
def generate_id(name):
"""Generate a STIX ID for a Vulnerability.
:param name: The name of the vulnerability (e.g., CVE ID)
:type name: str
:return: STIX ID for the vulnerability
:rtype: str
"""
name = name.lower().strip()
data = {"name": name}
data = canonicalize(data, utf8=False)
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
return "vulnerability--" + id
@staticmethod
[docs]
def generate_id_from_data(data):
"""Generate a STIX ID from vulnerability data.
:param data: Dictionary containing 'name' key
:type data: dict
:return: STIX ID for the vulnerability
:rtype: str
"""
return Vulnerability.generate_id(data["name"])
[docs]
def list(self, **kwargs):
"""List Vulnerability objects.
:param filters: the filters to apply
:type filters: dict
:param search: the search keyword
:type search: str
:param first: return the first n rows from the after ID (or the beginning if not set)
:type first: int
:param after: ID of the first row for pagination
:type after: str
:param orderBy: field to order results by
:type orderBy: str
:param orderMode: ordering mode (asc/desc)
:type orderMode: str
:param customAttributes: custom attributes to return
:type customAttributes: str
:param getAll: whether to retrieve all results
:type getAll: bool
:param withPagination: whether to include pagination info
:type withPagination: bool
:return: List of Vulnerability objects
:rtype: list
"""
filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 100)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
self.opencti.app_logger.info(
"Listing Vulnerabilities with filters", {"filters": json.dumps(filters)}
)
query = (
"""
query Vulnerabilities($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: VulnerabilitiesOrdering, $orderMode: OrderingMode) {
vulnerabilities(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
edges {
node {
"""
+ (custom_attributes if custom_attributes is not None else self.properties)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
if get_all:
final_data = []
data = self.opencti.process_multiple(result["data"]["vulnerabilities"])
final_data = final_data + data
while result["data"]["vulnerabilities"]["pageInfo"]["hasNextPage"]:
after = result["data"]["vulnerabilities"]["pageInfo"]["endCursor"]
self.opencti.app_logger.debug(
"Listing Vulnerabilities", {"after": after}
)
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
data = self.opencti.process_multiple(result["data"]["vulnerabilities"])
final_data = final_data + data
return final_data
else:
return self.opencti.process_multiple(
result["data"]["vulnerabilities"], with_pagination
)
[docs]
def read(self, **kwargs):
"""Read a Vulnerability object.
:param id: the id of the Vulnerability
:type id: str
:param filters: the filters to apply if no id provided
:type filters: dict
:param customAttributes: custom attributes to return
:type customAttributes: str
:return: Vulnerability object
:rtype: dict or None
"""
id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
if id is not None:
self.opencti.app_logger.info("Reading Vulnerability", {"id": id})
query = (
"""
query Vulnerability($id: String!) {
vulnerability(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else self.properties
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(result["data"]["vulnerability"])
elif filters is not None:
result = self.list(filters=filters)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_vulnerability] Missing parameters: id or filters"
)
return None
[docs]
def create(self, **kwargs):
"""Create a Vulnerability object.
:param name: the name of the Vulnerability (required)
:type name: str
:param stix_id: (optional) the STIX ID
:type stix_id: str
:param createdBy: (optional) the author ID
:type createdBy: str
:param objectMarking: (optional) list of marking definition IDs
:type objectMarking: list
:param objectLabel: (optional) list of label IDs
:type objectLabel: list
:param externalReferences: (optional) list of external reference IDs
:type externalReferences: list
:param revoked: (optional) whether the vulnerability is revoked
:type revoked: bool
:param confidence: (optional) confidence level (0-100)
:type confidence: int
:param lang: (optional) language
:type lang: str
:param created: (optional) creation date
:type created: str
:param modified: (optional) modification date
:type modified: str
:param description: (optional) description
:type description: str
:param x_opencti_aliases: (optional) list of aliases
:type x_opencti_aliases: list
:param x_opencti_cvss_vector_string: (optional) CVSS v3 vector string
:type x_opencti_cvss_vector_string: str
:param x_opencti_cvss_base_score: (optional) CVSS v3 base score
:type x_opencti_cvss_base_score: float
:param x_opencti_cvss_base_severity: (optional) CVSS v3 base severity
:type x_opencti_cvss_base_severity: str
:param x_opencti_cwe: (optional) CWE ID
:type x_opencti_cwe: str
:param x_opencti_cisa_kev: (optional) CISA KEV flag
:type x_opencti_cisa_kev: bool
:param x_opencti_epss_score: (optional) EPSS score
:type x_opencti_epss_score: float
:param x_opencti_epss_percentile: (optional) EPSS percentile
:type x_opencti_epss_percentile: float
:param x_opencti_score: (optional) OpenCTI score
:type x_opencti_score: int
:param x_opencti_first_seen_active: (optional) first seen active date
:type x_opencti_first_seen_active: str
:param x_opencti_stix_ids: (optional) list of additional STIX IDs
:type x_opencti_stix_ids: list
:param objectOrganization: (optional) list of organization IDs
:type objectOrganization: list
:param x_opencti_workflow_id: (optional) workflow ID
:type x_opencti_workflow_id: str
:param x_opencti_modified_at: (optional) custom modification date
:type x_opencti_modified_at: str
:param update: (optional) whether to update if exists (default: False)
:type update: bool
:param files: (optional) list of File objects to attach
:type files: list
:param filesMarkings: (optional) list of lists of marking definition IDs for each file
:type filesMarkings: list
:return: Vulnerability object
:rtype: dict or None
"""
stix_id = kwargs.get("stix_id", None)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
external_references = kwargs.get("externalReferences", None)
revoked = kwargs.get("revoked", None)
confidence = kwargs.get("confidence", None)
lang = kwargs.get("lang", None)
created = kwargs.get("created", None)
modified = kwargs.get("modified", None)
name = kwargs.get("name", None)
description = kwargs.get("description", None)
x_opencti_aliases = kwargs.get("x_opencti_aliases", None)
# CVSS3
x_opencti_cvss_vector_string = kwargs.get("x_opencti_cvss_vector_string", None)
x_opencti_cvss_base_score = kwargs.get("x_opencti_cvss_base_score", None)
x_opencti_cvss_base_severity = kwargs.get("x_opencti_cvss_base_severity", None)
x_opencti_cvss_attack_vector = kwargs.get("x_opencti_cvss_attack_vector", None)
x_opencti_cvss_attack_complexity = kwargs.get(
"x_opencti_cvss_attack_complexity", None
)
x_opencti_cvss_privileges_required = kwargs.get(
"x_opencti_cvss_privileges_required", None
)
x_opencti_cvss_user_interaction = kwargs.get(
"x_opencti_cvss_user_interaction", None
)
x_opencti_cvss_scope = kwargs.get("x_opencti_cvss_scope", None)
x_opencti_cvss_confidentiality_impact = kwargs.get(
"x_opencti_cvss_confidentiality_impact", None
)
x_opencti_cvss_integrity_impact = kwargs.get(
"x_opencti_cvss_integrity_impact", None
)
x_opencti_cvss_availability_impact = kwargs.get(
"x_opencti_cvss_availability_impact", None
)
x_opencti_cvss_exploit_code_maturity = kwargs.get(
"x_opencti_cvss_exploit_code_maturity", None
)
x_opencti_cvss_remediation_level = kwargs.get(
"x_opencti_cvss_remediation_level", None
)
x_opencti_cvss_report_confidence = kwargs.get(
"x_opencti_cvss_report_confidence", None
)
x_opencti_cvss_temporal_score = kwargs.get(
"x_opencti_cvss_temporal_score", None
)
# CVSS2
x_opencti_cvss_v2_vector_string = kwargs.get(
"x_opencti_cvss_v2_vector_string", None
)
x_opencti_cvss_v2_base_score = kwargs.get("x_opencti_cvss_v2_base_score", None)
x_opencti_cvss_v2_access_vector = kwargs.get(
"x_opencti_cvss_v2_access_vector", None
)
x_opencti_cvss_v2_access_complexity = kwargs.get(
"x_opencti_cvss_v2_access_complexity", None
)
x_opencti_cvss_v2_authentication = kwargs.get(
"x_opencti_cvss_v2_authentication", None
)
x_opencti_cvss_v2_confidentiality_impact = kwargs.get(
"x_opencti_cvss_v2_confidentiality_impact", None
)
x_opencti_cvss_v2_integrity_impact = kwargs.get(
"x_opencti_cvss_v2_integrity_impact", None
)
x_opencti_cvss_v2_availability_impact = kwargs.get(
"x_opencti_cvss_v2_availability_impact", None
)
x_opencti_cvss_v2_exploitability = kwargs.get(
"x_opencti_cvss_v2_exploitability", None
)
x_opencti_cvss_v2_remediation_level = kwargs.get(
"x_opencti_cvss_v2_remediation_level", None
)
x_opencti_cvss_v2_report_confidence = kwargs.get(
"x_opencti_cvss_v2_report_confidence", None
)
x_opencti_cvss_v2_temporal_score = kwargs.get(
"x_opencti_cvss_v2_temporal_score", None
)
# CVSS4
x_opencti_cvss_v4_vector_string = kwargs.get(
"x_opencti_cvss_v4_vector_string", None
)
x_opencti_cvss_v4_base_score = kwargs.get("x_opencti_cvss_v4_base_score", None)
x_opencti_cvss_v4_base_severity = kwargs.get(
"x_opencti_cvss_v4_base_severity", None
)
x_opencti_cvss_v4_attack_vector = kwargs.get(
"x_opencti_cvss_v4_attack_vector", None
)
x_opencti_cvss_v4_attack_complexity = kwargs.get(
"x_opencti_cvss_v4_attack_complexity", None
)
x_opencti_cvss_v4_attack_requirements = kwargs.get(
"x_opencti_cvss_v4_attack_requirements", None
)
x_opencti_cvss_v4_privileges_required = kwargs.get(
"x_opencti_cvss_v4_privileges_required", None
)
x_opencti_cvss_v4_user_interaction = kwargs.get(
"x_opencti_cvss_v4_user_interaction", None
)
x_opencti_cvss_v4_confidentiality_impact_v = kwargs.get(
"x_opencti_cvss_v4_confidentiality_impact_v", None
)
x_opencti_cvss_v4_confidentiality_impact_s = kwargs.get(
"x_opencti_cvss_v4_confidentiality_impact_s", None
)
x_opencti_cvss_v4_integrity_impact_v = kwargs.get(
"x_opencti_cvss_v4_integrity_impact_v", None
)
x_opencti_cvss_v4_integrity_impact_s = kwargs.get(
"x_opencti_cvss_v4_integrity_impact_s", None
)
x_opencti_cvss_v4_availability_impact_v = kwargs.get(
"x_opencti_cvss_v4_availability_impact_v", None
)
x_opencti_cvss_v4_availability_impact_s = kwargs.get(
"x_opencti_cvss_v4_availability_impact_s", None
)
x_opencti_cvss_v4_exploit_maturity = kwargs.get(
"x_opencti_cvss_v4_exploit_maturity", None
)
# Others
x_opencti_cwe = kwargs.get("x_opencti_cwe", None)
x_opencti_cisa_kev = kwargs.get("x_opencti_cisa_kev", None)
x_opencti_epss_score = kwargs.get("x_opencti_epss_score", None)
x_opencti_epss_percentile = kwargs.get("x_opencti_epss_percentile", None)
x_opencti_score = kwargs.get("x_opencti_score", None)
x_opencti_first_seen_active = kwargs.get("x_opencti_first_seen_active", None)
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
granted_refs = kwargs.get("objectOrganization", None)
x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None)
x_opencti_modified_at = kwargs.get("x_opencti_modified_at", None)
update = kwargs.get("update", False)
files = kwargs.get("files", None)
files_markings = kwargs.get("filesMarkings", None)
no_trigger_import = kwargs.get("noTriggerImport", None)
embedded = kwargs.get("embedded", None)
upsert_operations = kwargs.get("upsert_operations", None)
if name is not None:
self.opencti.app_logger.info("Creating Vulnerability", {"name": name})
query = """
mutation VulnerabilityAdd($input: VulnerabilityAddInput!) {
vulnerabilityAdd(input: $input) {
id
standard_id
entity_type
parent_types
}
}
"""
result = self.opencti.query(
query,
{
"input": {
"stix_id": stix_id,
"createdBy": created_by,
"objectMarking": object_marking,
"objectLabel": object_label,
"objectOrganization": granted_refs,
"externalReferences": external_references,
"revoked": revoked,
"confidence": confidence,
"lang": lang,
"created": created,
"modified": modified,
"name": name,
"description": description,
"x_opencti_aliases": x_opencti_aliases,
# CVSS3
"x_opencti_cvss_vector_string": x_opencti_cvss_vector_string,
"x_opencti_cvss_base_score": x_opencti_cvss_base_score,
"x_opencti_cvss_base_severity": x_opencti_cvss_base_severity,
"x_opencti_cvss_attack_vector": x_opencti_cvss_attack_vector,
"x_opencti_cvss_attack_complexity": x_opencti_cvss_attack_complexity,
"x_opencti_cvss_privileges_required": x_opencti_cvss_privileges_required,
"x_opencti_cvss_user_interaction": x_opencti_cvss_user_interaction,
"x_opencti_cvss_scope": x_opencti_cvss_scope,
"x_opencti_cvss_confidentiality_impact": x_opencti_cvss_confidentiality_impact,
"x_opencti_cvss_integrity_impact": x_opencti_cvss_integrity_impact,
"x_opencti_cvss_availability_impact": x_opencti_cvss_availability_impact,
"x_opencti_cvss_exploit_code_maturity": x_opencti_cvss_exploit_code_maturity,
"x_opencti_cvss_remediation_level": x_opencti_cvss_remediation_level,
"x_opencti_cvss_report_confidence": x_opencti_cvss_report_confidence,
"x_opencti_cvss_temporal_score": x_opencti_cvss_temporal_score,
# CVSS2
"x_opencti_cvss_v2_vector_string": x_opencti_cvss_v2_vector_string,
"x_opencti_cvss_v2_base_score": x_opencti_cvss_v2_base_score,
"x_opencti_cvss_v2_access_vector": x_opencti_cvss_v2_access_vector,
"x_opencti_cvss_v2_access_complexity": x_opencti_cvss_v2_access_complexity,
"x_opencti_cvss_v2_authentication": x_opencti_cvss_v2_authentication,
"x_opencti_cvss_v2_confidentiality_impact": x_opencti_cvss_v2_confidentiality_impact,
"x_opencti_cvss_v2_integrity_impact": x_opencti_cvss_v2_integrity_impact,
"x_opencti_cvss_v2_availability_impact": x_opencti_cvss_v2_availability_impact,
"x_opencti_cvss_v2_exploitability": x_opencti_cvss_v2_exploitability,
"x_opencti_cvss_v2_remediation_level": x_opencti_cvss_v2_remediation_level,
"x_opencti_cvss_v2_report_confidence": x_opencti_cvss_v2_report_confidence,
"x_opencti_cvss_v2_temporal_score": x_opencti_cvss_v2_temporal_score,
# CVSS 4
"x_opencti_cvss_v4_vector_string": x_opencti_cvss_v4_vector_string,
"x_opencti_cvss_v4_base_score": x_opencti_cvss_v4_base_score,
"x_opencti_cvss_v4_base_severity": x_opencti_cvss_v4_base_severity,
"x_opencti_cvss_v4_attack_vector": x_opencti_cvss_v4_attack_vector,
"x_opencti_cvss_v4_attack_complexity": x_opencti_cvss_v4_attack_complexity,
"x_opencti_cvss_v4_attack_requirements": x_opencti_cvss_v4_attack_requirements,
"x_opencti_cvss_v4_privileges_required": x_opencti_cvss_v4_privileges_required,
"x_opencti_cvss_v4_user_interaction": x_opencti_cvss_v4_user_interaction,
"x_opencti_cvss_v4_confidentiality_impact_v": x_opencti_cvss_v4_confidentiality_impact_v,
"x_opencti_cvss_v4_confidentiality_impact_s": x_opencti_cvss_v4_confidentiality_impact_s,
"x_opencti_cvss_v4_integrity_impact_v": x_opencti_cvss_v4_integrity_impact_v,
"x_opencti_cvss_v4_integrity_impact_s": x_opencti_cvss_v4_integrity_impact_s,
"x_opencti_cvss_v4_availability_impact_v": x_opencti_cvss_v4_availability_impact_v,
"x_opencti_cvss_v4_availability_impact_s": x_opencti_cvss_v4_availability_impact_s,
"x_opencti_cvss_v4_exploit_maturity": x_opencti_cvss_v4_exploit_maturity,
# Others
"x_opencti_cwe": x_opencti_cwe,
"x_opencti_cisa_kev": x_opencti_cisa_kev,
"x_opencti_epss_score": x_opencti_epss_score,
"x_opencti_epss_percentile": x_opencti_epss_percentile,
"x_opencti_score": x_opencti_score,
"x_opencti_first_seen_active": x_opencti_first_seen_active,
"x_opencti_stix_ids": x_opencti_stix_ids,
"x_opencti_workflow_id": x_opencti_workflow_id,
"x_opencti_modified_at": x_opencti_modified_at,
"update": update,
"files": files,
"filesMarkings": files_markings,
"noTriggerImport": no_trigger_import,
"embedded": embedded,
"upsertOperations": upsert_operations,
}
},
)
return self.opencti.process_multiple_fields(
result["data"]["vulnerabilityAdd"]
)
else:
self.opencti.app_logger.error(
"[opencti_vulnerability] Missing parameters: name"
)
return None
[docs]
def import_from_stix2(self, **kwargs):
"""Import a Vulnerability object from a STIX2 object.
:param stixObject: the STIX2 Vulnerability object
:type stixObject: dict
:param extras: extra parameters including created_by_id, object_marking_ids, etc.
:type extras: dict
:param update: whether to update if the entity already exists
:type update: bool
:return: Vulnerability object
:rtype: dict or None
"""
stix_object = kwargs.get("stixObject", None)
extras = kwargs.get("extras", {})
update = kwargs.get("update", False)
if stix_object is not None:
# Search in extensions
if "x_opencti_stix_ids" not in stix_object:
stix_object["x_opencti_stix_ids"] = (
self.opencti.get_attribute_in_extension("stix_ids", stix_object)
)
if "x_opencti_granted_refs" not in stix_object:
stix_object["x_opencti_granted_refs"] = (
self.opencti.get_attribute_in_extension("granted_refs", stix_object)
)
if "x_opencti_workflow_id" not in stix_object:
stix_object["x_opencti_workflow_id"] = (
self.opencti.get_attribute_in_extension("workflow_id", stix_object)
)
# Backward compatibility
if "x_opencti_base_score" in stix_object:
stix_object["x_opencti_cvss_base_score"] = stix_object[
"x_opencti_base_score"
]
if "x_opencti_base_severity" in stix_object:
stix_object["x_opencti_cvss_base_severity"] = stix_object[
"x_opencti_base_severity"
]
if "x_opencti_attack_vector" in stix_object:
stix_object["x_opencti_cvss_attack_vector"] = stix_object[
"x_opencti_attack_vector"
]
if "x_opencti_integrity_impact" in stix_object:
stix_object["x_opencti_cvss_integrity_impact"] = stix_object[
"x_opencti_integrity_impact"
]
if "x_opencti_availability_impact" in stix_object:
stix_object["x_opencti_cvss_availability_impact"] = stix_object[
"x_opencti_availability_impact"
]
if "x_opencti_confidentiality_impact" in stix_object:
stix_object["x_opencti_cvss_confidentiality_impact"] = stix_object[
"x_opencti_confidentiality_impact"
]
# Search in extensions
if "x_opencti_aliases" not in stix_object:
stix_object["x_opencti_aliases"] = (
self.opencti.get_attribute_in_extension("aliases", stix_object)
)
# CVSS3
if "x_opencti_cvss_vector_string" not in stix_object:
stix_object["x_opencti_cvss_vector_string"] = (
self.opencti.get_attribute_in_extension("cvss_vector", stix_object)
)
if "x_opencti_cvss_base_score" not in stix_object:
stix_object["x_opencti_cvss_base_score"] = (
self.opencti.get_attribute_in_extension(
"cvss_base_score", stix_object
)
)
if "x_opencti_cvss_base_severity" not in stix_object:
stix_object["x_opencti_cvss_base_severity"] = (
self.opencti.get_attribute_in_extension(
"cvss_base_severity", stix_object
)
)
if "x_opencti_cvss_attack_vector" not in stix_object:
stix_object["x_opencti_cvss_attack_vector"] = (
self.opencti.get_attribute_in_extension(
"cvss_attack_vector", stix_object
)
)
if "x_opencti_cvss_attack_complexity" not in stix_object:
stix_object["x_opencti_cvss_attack_complexity"] = (
self.opencti.get_attribute_in_extension(
"cvss_attack_complexity", stix_object
)
)
if "x_opencti_cvss_privileges_required" not in stix_object:
stix_object["x_opencti_cvss_privileges_required"] = (
self.opencti.get_attribute_in_extension(
"cvss_privileges_required", stix_object
)
)
if "x_opencti_cvss_user_interaction" not in stix_object:
stix_object["x_opencti_cvss_user_interaction"] = (
self.opencti.get_attribute_in_extension(
"cvss_user_interaction", stix_object
)
)
if "x_opencti_cvss_scope" not in stix_object:
stix_object["x_opencti_cvss_scope"] = (
self.opencti.get_attribute_in_extension("cvss_scope", stix_object)
)
if "x_opencti_cvss_confidentiality_impact" not in stix_object:
stix_object["x_opencti_cvss_confidentiality_impact"] = (
self.opencti.get_attribute_in_extension(
"cvss_confidentiality_impact", stix_object
)
)
if "x_opencti_cvss_integrity_impact" not in stix_object:
stix_object["x_opencti_cvss_integrity_impact"] = (
self.opencti.get_attribute_in_extension(
"cvss_integrity_impact", stix_object
)
)
if "x_opencti_cvss_availability_impact" not in stix_object:
stix_object["x_opencti_cvss_availability_impact"] = (
self.opencti.get_attribute_in_extension(
"cvss_availability_impact", stix_object
)
)
if "x_opencti_cvss_exploit_code_maturity" not in stix_object:
stix_object["x_opencti_cvss_exploit_code_maturity"] = (
self.opencti.get_attribute_in_extension(
"cvss_exploit_code_maturity", stix_object
)
)
if "x_opencti_cvss_remediation_level" not in stix_object:
stix_object["x_opencti_cvss_remediation_level"] = (
self.opencti.get_attribute_in_extension(
"cvss_remediation_level", stix_object
)
)
if "x_opencti_cvss_report_confidence" not in stix_object:
stix_object["x_opencti_cvss_report_confidence"] = (
self.opencti.get_attribute_in_extension(
"cvss_report_confidence", stix_object
)
)
if "x_opencti_cvss_temporal_score" not in stix_object:
stix_object["x_opencti_cvss_temporal_score"] = (
self.opencti.get_attribute_in_extension(
"cvss_temporal_score", stix_object
)
)
# CVSS2
if "x_opencti_cvss_v2_vector_string" not in stix_object:
stix_object["x_opencti_cvss_v2_vector_string"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_vector", stix_object
)
)
if "x_opencti_cvss_v2_base_score" not in stix_object:
stix_object["x_opencti_cvss_v2_base_score"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_base_score", stix_object
)
)
if "x_opencti_cvss_v2_access_vector" not in stix_object:
stix_object["x_opencti_cvss_v2_access_vector"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_access_vector", stix_object
)
)
if "x_opencti_cvss_v2_access_complexity" not in stix_object:
stix_object["x_opencti_cvss_v2_access_complexity"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_access_complexity", stix_object
)
)
if "x_opencti_cvss_v2_authentication" not in stix_object:
stix_object["x_opencti_cvss_v2_authentication"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_authentication", stix_object
)
)
if "x_opencti_cvss_v2_confidentiality_impact" not in stix_object:
stix_object["x_opencti_cvss_v2_confidentiality_impact"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_confidentiality_impact", stix_object
)
)
if "x_opencti_cvss_v2_integrity_impact" not in stix_object:
stix_object["x_opencti_cvss_v2_integrity_impact"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_integrity_impact", stix_object
)
)
if "x_opencti_cvss_v2_availability_impact" not in stix_object:
stix_object["x_opencti_cvss_v2_availability_impact"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_availability_impact", stix_object
)
)
if "x_opencti_cvss_v2_exploitability" not in stix_object:
stix_object["x_opencti_cvss_v2_exploitability"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_exploitability", stix_object
)
)
if "x_opencti_cvss_v2_remediation_level" not in stix_object:
stix_object["x_opencti_cvss_v2_remediation_level"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_remediation_level", stix_object
)
)
if "x_opencti_cvss_v2_report_confidence" not in stix_object:
stix_object["x_opencti_cvss_v2_report_confidence"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_report_confidence", stix_object
)
)
if "x_opencti_cvss_v2_temporal_score" not in stix_object:
stix_object["x_opencti_cvss_v2_temporal_score"] = (
self.opencti.get_attribute_in_extension(
"cvss_v2_temporal_score", stix_object
)
)
# CVSS4
if "x_opencti_cvss_v4_vector_string" not in stix_object:
stix_object["x_opencti_cvss_v4_vector_string"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_vector", stix_object
)
)
if "x_opencti_cvss_v4_base_score" not in stix_object:
stix_object["x_opencti_cvss_v4_base_score"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_base_score", stix_object
)
)
if "x_opencti_cvss_v4_base_severity" not in stix_object:
stix_object["x_opencti_cvss_v4_base_severity"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_base_severity", stix_object
)
)
if "x_opencti_cvss_v4_attack_vector" not in stix_object:
stix_object["x_opencti_cvss_v4_attack_vector"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_attack_vector", stix_object
)
)
if "x_opencti_cvss_v4_attack_complexity" not in stix_object:
stix_object["x_opencti_cvss_v4_attack_complexity"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_attack_complexity", stix_object
)
)
if "x_opencti_cvss_v4_attack_requirements" not in stix_object:
stix_object["x_opencti_cvss_v4_attack_requirements"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_attack_requirements", stix_object
)
)
if "x_opencti_cvss_v4_privileges_required" not in stix_object:
stix_object["x_opencti_cvss_v4_privileges_required"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_privileges_required", stix_object
)
)
if "x_opencti_cvss_v4_user_interaction" not in stix_object:
stix_object["x_opencti_cvss_v4_user_interaction"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_user_interaction", stix_object
)
)
if "x_opencti_cvss_v4_confidentiality_impact_v" not in stix_object:
stix_object["x_opencti_cvss_v4_confidentiality_impact_v"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_confidentiality_impact_v", stix_object
)
)
if "x_opencti_cvss_v4_confidentiality_impact_s" not in stix_object:
stix_object["x_opencti_cvss_v4_confidentiality_impact_s"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_confidentiality_impact_s", stix_object
)
)
if "x_opencti_cvss_v4_integrity_impact_v" not in stix_object:
stix_object["x_opencti_cvss_v4_integrity_impact_v"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_integrity_impact_v", stix_object
)
)
if "x_opencti_cvss_v4_integrity_impact_s" not in stix_object:
stix_object["x_opencti_cvss_v4_integrity_impact_s"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_integrity_impact_s", stix_object
)
)
if "x_opencti_cvss_v4_availability_impact_v" not in stix_object:
stix_object["x_opencti_cvss_v4_availability_impact_v"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_availability_impact_v", stix_object
)
)
if "x_opencti_cvss_v4_availability_impact_s" not in stix_object:
stix_object["x_opencti_cvss_v4_availability_impact_s"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_availability_impact_s", stix_object
)
)
if "x_opencti_cvss_v4_exploit_maturity" not in stix_object:
stix_object["x_opencti_cvss_v4_exploit_maturity"] = (
self.opencti.get_attribute_in_extension(
"cvss_v4_exploit_maturity", stix_object
)
)
# Others
if "x_opencti_cwe" not in stix_object:
stix_object["x_opencti_cwe"] = self.opencti.get_attribute_in_extension(
"cwe", stix_object
)
if "x_opencti_cisa_kev" not in stix_object:
stix_object["x_opencti_cisa_kev"] = (
self.opencti.get_attribute_in_extension("cisa_kev", stix_object)
)
if "x_opencti_epss_score" not in stix_object:
stix_object["x_opencti_epss_score"] = (
self.opencti.get_attribute_in_extension("epss_score", stix_object)
)
if "x_opencti_epss_percentile" not in stix_object:
stix_object["x_opencti_epss_percentile"] = (
self.opencti.get_attribute_in_extension(
"epss_percentile", stix_object
)
)
if "x_opencti_score" not in stix_object:
stix_object["x_opencti_score"] = (
self.opencti.get_attribute_in_extension("score", stix_object)
)
if "x_opencti_first_seen_active" not in stix_object:
stix_object["x_opencti_first_seen_active"] = (
self.opencti.get_attribute_in_extension(
"first_seen_active", stix_object
)
)
if "x_opencti_modified_at" not in stix_object:
stix_object["x_opencti_modified_at"] = (
self.opencti.get_attribute_in_extension("modified_at", stix_object)
)
if "opencti_upsert_operations" not in stix_object:
stix_object["opencti_upsert_operations"] = (
self.opencti.get_attribute_in_extension(
"opencti_upsert_operations", stix_object
)
)
return self.create(
stix_id=stix_object["id"],
createdBy=(
extras["created_by_id"] if "created_by_id" in extras else None
),
objectMarking=(
extras["object_marking_ids"]
if "object_marking_ids" in extras
else None
),
objectLabel=(
extras["object_label_ids"] if "object_label_ids" in extras else None
),
externalReferences=(
extras["external_references_ids"]
if "external_references_ids" in extras
else None
),
revoked=stix_object["revoked"] if "revoked" in stix_object else None,
confidence=(
stix_object["confidence"] if "confidence" in stix_object else None
),
lang=stix_object["lang"] if "lang" in stix_object else None,
created=stix_object["created"] if "created" in stix_object else None,
modified=stix_object["modified"] if "modified" in stix_object else None,
name=stix_object["name"],
description=(
self.opencti.stix2.convert_markdown(stix_object["description"])
if "description" in stix_object
else None
),
x_opencti_aliases=(
stix_object["x_opencti_aliases"]
if "x_opencti_aliases" in stix_object
else None
),
x_opencti_stix_ids=(
stix_object["x_opencti_stix_ids"]
if "x_opencti_stix_ids" in stix_object
else None
),
objectOrganization=(
stix_object["x_opencti_granted_refs"]
if "x_opencti_granted_refs" in stix_object
else None
),
x_opencti_workflow_id=(
stix_object["x_opencti_workflow_id"]
if "x_opencti_workflow_id" in stix_object
else None
),
x_opencti_modified_at=(
stix_object["x_opencti_modified_at"]
if "x_opencti_modified_at" in stix_object
else None
),
# CVSS3
x_opencti_cvss_vector_string=(
stix_object["x_opencti_cvss_vector_string"]
if "x_opencti_cvss_vector_string" in stix_object
else None
),
x_opencti_cvss_base_score=(
stix_object["x_opencti_cvss_base_score"]
if "x_opencti_cvss_base_score" in stix_object
else None
),
x_opencti_cvss_base_severity=(
stix_object["x_opencti_cvss_base_severity"]
if "x_opencti_cvss_base_severity" in stix_object
else None
),
x_opencti_cvss_attack_vector=(
stix_object["x_opencti_cvss_attack_vector"]
if "x_opencti_cvss_attack_vector" in stix_object
else None
),
x_opencti_cvss_attack_complexity=(
stix_object["x_opencti_cvss_attack_complexity"]
if "x_opencti_cvss_attack_complexity" in stix_object
else None
),
x_opencti_cvss_privileges_required=(
stix_object["x_opencti_cvss_privileges_required"]
if "x_opencti_cvss_privileges_required" in stix_object
else None
),
x_opencti_cvss_user_interaction=(
stix_object["x_opencti_cvss_user_interaction"]
if "x_opencti_cvss_user_interaction" in stix_object
else None
),
x_opencti_cvss_scope=(
stix_object["x_opencti_cvss_scope"]
if "x_opencti_cvss_scope" in stix_object
else None
),
x_opencti_cvss_confidentiality_impact=(
stix_object["x_opencti_cvss_confidentiality_impact"]
if "x_opencti_cvss_confidentiality_impact" in stix_object
else None
),
x_opencti_cvss_integrity_impact=(
stix_object["x_opencti_cvss_integrity_impact"]
if "x_opencti_cvss_integrity_impact" in stix_object
else None
),
x_opencti_cvss_availability_impact=(
stix_object["x_opencti_cvss_availability_impact"]
if "x_opencti_cvss_availability_impact" in stix_object
else None
),
x_opencti_cvss_exploit_code_maturity=(
stix_object["x_opencti_cvss_exploit_code_maturity"]
if "x_opencti_cvss_exploit_code_maturity" in stix_object
else None
),
x_opencti_cvss_remediation_level=(
stix_object["x_opencti_cvss_remediation_level"]
if "x_opencti_cvss_remediation_level" in stix_object
else None
),
x_opencti_cvss_report_confidence=(
stix_object["x_opencti_cvss_report_confidence"]
if "x_opencti_cvss_report_confidence" in stix_object
else None
),
x_opencti_cvss_temporal_score=(
stix_object["x_opencti_cvss_temporal_score"]
if "x_opencti_cvss_temporal_score" in stix_object
else None
),
# CVSS2
x_opencti_cvss_v2_vector_string=(
stix_object["x_opencti_cvss_v2_vector_string"]
if "x_opencti_cvss_v2_vector_string" in stix_object
else None
),
x_opencti_cvss_v2_base_score=(
stix_object["x_opencti_cvss_v2_base_score"]
if "x_opencti_cvss_v2_base_score" in stix_object
else None
),
x_opencti_cvss_v2_access_vector=(
stix_object["x_opencti_cvss_v2_access_vector"]
if "x_opencti_cvss_v2_access_vector" in stix_object
else None
),
x_opencti_cvss_v2_access_complexity=(
stix_object["x_opencti_cvss_v2_access_complexity"]
if "x_opencti_cvss_v2_access_complexity" in stix_object
else None
),
x_opencti_cvss_v2_authentication=(
stix_object["x_opencti_cvss_v2_authentication"]
if "x_opencti_cvss_v2_authentication" in stix_object
else None
),
x_opencti_cvss_v2_confidentiality_impact=(
stix_object["x_opencti_cvss_v2_confidentiality_impact"]
if "x_opencti_cvss_v2_confidentiality_impact" in stix_object
else None
),
x_opencti_cvss_v2_integrity_impact=(
stix_object["x_opencti_cvss_v2_integrity_impact"]
if "x_opencti_cvss_v2_integrity_impact" in stix_object
else None
),
x_opencti_cvss_v2_availability_impact=(
stix_object["x_opencti_cvss_v2_availability_impact"]
if "x_opencti_cvss_v2_availability_impact" in stix_object
else None
),
x_opencti_cvss_v2_exploitability=(
stix_object["x_opencti_cvss_v2_exploitability"]
if "x_opencti_cvss_v2_exploitability" in stix_object
else None
),
x_opencti_cvss_v2_remediation_level=(
stix_object["x_opencti_cvss_v2_remediation_level"]
if "x_opencti_cvss_v2_remediation_level" in stix_object
else None
),
x_opencti_cvss_v2_report_confidence=(
stix_object["x_opencti_cvss_v2_report_confidence"]
if "x_opencti_cvss_v2_report_confidence" in stix_object
else None
),
x_opencti_cvss_v2_temporal_score=(
stix_object["x_opencti_cvss_v2_temporal_score"]
if "x_opencti_cvss_v2_temporal_score" in stix_object
else None
),
# CVSS4
x_opencti_cvss_v4_vector_string=(
stix_object["x_opencti_cvss_v4_vector_string"]
if "x_opencti_cvss_v4_vector_string" in stix_object
else None
),
x_opencti_cvss_v4_base_score=(
stix_object["x_opencti_cvss_v4_base_score"]
if "x_opencti_cvss_v4_base_score" in stix_object
else None
),
x_opencti_cvss_v4_base_severity=(
stix_object["x_opencti_cvss_v4_base_severity"]
if "x_opencti_cvss_v4_base_severity" in stix_object
else None
),
x_opencti_cvss_v4_attack_vector=(
stix_object["x_opencti_cvss_v4_attack_vector"]
if "x_opencti_cvss_v4_attack_vector" in stix_object
else None
),
x_opencti_cvss_v4_attack_complexity=(
stix_object["x_opencti_cvss_v4_attack_complexity"]
if "x_opencti_cvss_v4_attack_complexity" in stix_object
else None
),
x_opencti_cvss_v4_attack_requirements=(
stix_object["x_opencti_cvss_v4_attack_requirements"]
if "x_opencti_cvss_v4_attack_requirements" in stix_object
else None
),
x_opencti_cvss_v4_privileges_required=(
stix_object["x_opencti_cvss_v4_privileges_required"]
if "x_opencti_cvss_v4_privileges_required" in stix_object
else None
),
x_opencti_cvss_v4_user_interaction=(
stix_object["x_opencti_cvss_v4_user_interaction"]
if "x_opencti_cvss_v4_user_interaction" in stix_object
else None
),
x_opencti_cvss_v4_confidentiality_impact_v=(
stix_object["x_opencti_cvss_v4_confidentiality_impact_v"]
if "x_opencti_cvss_v4_confidentiality_impact_v" in stix_object
else None
),
x_opencti_cvss_v4_confidentiality_impact_s=(
stix_object["x_opencti_cvss_v4_confidentiality_impact_s"]
if "x_opencti_cvss_v4_confidentiality_impact_s" in stix_object
else None
),
x_opencti_cvss_v4_integrity_impact_v=(
stix_object["x_opencti_cvss_v4_integrity_impact_v"]
if "x_opencti_cvss_v4_integrity_impact_v" in stix_object
else None
),
x_opencti_cvss_v4_integrity_impact_s=(
stix_object["x_opencti_cvss_v4_integrity_impact_s"]
if "x_opencti_cvss_v4_integrity_impact_s" in stix_object
else None
),
x_opencti_cvss_v4_availability_impact_v=(
stix_object["x_opencti_cvss_v4_availability_impact_v"]
if "x_opencti_cvss_v4_availability_impact_v" in stix_object
else None
),
x_opencti_cvss_v4_availability_impact_s=(
stix_object["x_opencti_cvss_v4_availability_impact_s"]
if "x_opencti_cvss_v4_availability_impact_s" in stix_object
else None
),
x_opencti_cvss_v4_exploit_maturity=(
stix_object["x_opencti_cvss_v4_exploit_maturity"]
if "x_opencti_cvss_v4_exploit_maturity" in stix_object
else None
),
# Others
x_opencti_cwe=(
stix_object["x_opencti_cwe"]
if "x_opencti_cwe" in stix_object
else None
),
x_opencti_cisa_kev=(
stix_object["x_opencti_cisa_kev"]
if "x_opencti_cisa_kev" in stix_object
else None
),
x_opencti_epss_score=(
stix_object["x_opencti_epss_score"]
if "x_opencti_epss_score" in stix_object
else None
),
x_opencti_epss_percentile=(
stix_object["x_opencti_epss_percentile"]
if "x_opencti_epss_percentile" in stix_object
else None
),
x_opencti_score=(
stix_object["x_opencti_score"]
if "x_opencti_score" in stix_object
else None
),
x_opencti_first_seen_active=(
stix_object["x_opencti_first_seen_active"]
if "x_opencti_first_seen_active" in stix_object
else None
),
update=update,
files=extras.get("files"),
filesMarkings=extras.get("filesMarkings"),
noTriggerImport=extras.get("noTriggerImport", None),
embedded=extras.get("embedded", None),
upsert_operations=(
stix_object["opencti_upsert_operations"]
if "opencti_upsert_operations" in stix_object
else None
),
)
else:
self.opencti.app_logger.error(
"[opencti_vulnerability] Missing parameters: stixObject"
)
return None