import json
import uuid
from stix2.canonicalization.Canonicalize import canonicalize
[docs]
class Vocabulary:
"""Main Vocabulary class for OpenCTI
Manages vocabularies and controlled vocabularies in the OpenCTI platform.
:param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient`
:type opencti: OpenCTIApiClient
"""
def __init__(self, opencti):
"""Initialize the Vocabulary instance.
:param opencti: OpenCTI API client instance
:type opencti: OpenCTIApiClient
"""
[docs]
self.properties = """
id
name
category {
key
fields {
key
}
}
"""
@staticmethod
[docs]
def generate_id(name, category):
"""Generate a STIX ID for a Vocabulary.
:param name: the name of the Vocabulary
:type name: str
:param category: the category of the Vocabulary
:type category: str
:return: STIX ID for the Vocabulary
:rtype: str
"""
name = name.lower().strip()
data = {"name": name, "category": category}
data = canonicalize(data, utf8=False)
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
return "vocabulary--" + id
@staticmethod
[docs]
def generate_id_from_data(data):
"""Generate a STIX ID from Vocabulary data.
:param data: Dictionary containing 'name' and 'category' keys
:type data: dict
:return: STIX ID for the Vocabulary
:rtype: str
"""
return Vocabulary.generate_id(data["name"], data["category"])
[docs]
def list(self, **kwargs):
"""List Vocabulary objects.
:param filters: the filters to apply
:type filters: dict
:return: List of Vocabulary objects
:rtype: list
"""
filters = kwargs.get("filters", None)
self.opencti.app_logger.info(
"Listing Vocabularies with filters", {"filters": json.dumps(filters)}
)
query = (
"""
query Vocabularies($filters: FilterGroup) {
vocabularies(filters: $filters) {
edges {
node {
"""
+ self.properties
+ """
}
}
}
}
"""
)
result = self.opencti.query(
query,
{
"filters": filters,
},
)
return self.opencti.process_multiple(result["data"]["vocabularies"])
[docs]
def read(self, **kwargs):
"""Read a Vocabulary object.
:param id: the id of the Vocabulary
:type id: str
:param filters: the filters to apply if no id provided
:type filters: dict
:return: Vocabulary object
:rtype: dict or None
"""
id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
if id is not None:
self.opencti.app_logger.info("Reading vocabulary", {"id": id})
query = (
"""
query Vocabulary($id: String!) {
vocabulary(id: $id) {
"""
+ self.properties
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(result["data"]["vocabulary"])
elif filters is not None:
result = self.list(filters=filters)
if len(result) > 0:
return result[0]
else:
return None
else:
self.opencti.app_logger.error(
"[opencti_vocabulary] Missing parameters: id or filters"
)
return None
[docs]
def read_or_create_unchecked_with_cache(self, vocab, cache, field):
"""Read or create a Vocabulary using a cache for optimization.
:param vocab: the vocabulary name
:type vocab: str
:param cache: the cache dictionary
:type cache: dict
:param field: the field configuration containing 'required' and 'key'
:type field: dict
:return: Vocabulary object or None
:rtype: dict or None
"""
if "vocab_" + vocab in cache:
vocab_data = cache["vocab_" + vocab]
else:
vocab_data = self.read_or_create_unchecked(
name=vocab,
required=field["required"],
category=cache["category_" + field["key"]],
)
if vocab_data is not None:
cache["vocab_" + vocab] = vocab_data
return vocab_data
[docs]
def create(self, **kwargs):
"""Create a Vocabulary object.
:param stix_id: (optional) the STIX ID
:type stix_id: str
:param name: the name of the Vocabulary (required)
:type name: str
:param category: the category of the Vocabulary (required)
:type category: str
:param description: (optional) description
:type description: str
:param created: (optional) creation date
:type created: str
:param modified: (optional) modification date
:type modified: str
:param aliases: (optional) list of aliases
:type aliases: list
:param x_opencti_stix_ids: (optional) list of additional STIX IDs
:type x_opencti_stix_ids: list
:param update: (optional) whether to update if exists (default: False)
:type update: bool
:return: Vocabulary object
:rtype: dict or None
"""
stix_id = kwargs.get("stix_id", None)
name = kwargs.get("name", None)
category = kwargs.get("category", None)
description = kwargs.get("description", None)
created = kwargs.get("created", None)
modified = kwargs.get("modified", None)
aliases = kwargs.get("aliases", None)
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
update = kwargs.get("update", False)
if name is not None and category is not None:
self.opencti.app_logger.info(
"Creating or Getting aliased Vocabulary", {"name": name}
)
query = (
"""
mutation VocabularyAdd($input: VocabularyAddInput!) {
vocabularyAdd(input: $input) {
"""
+ self.properties
+ """
}
}
"""
)
result = self.opencti.query(
query,
{
"input": {
"stix_id": stix_id,
"x_opencti_stix_ids": x_opencti_stix_ids,
"name": name,
"description": description,
"category": category,
"created": created,
"modified": modified,
"aliases": aliases,
"update": update,
}
},
)
return result["data"]["vocabularyAdd"]
else:
self.opencti.app_logger.error(
"[opencti_vocabulary] Missing parameters: name or category",
)
return None
[docs]
def read_or_create_unchecked(self, **kwargs):
"""Read or create a Vocabulary.
If the user has no rights to create the vocabulary, return None.
:param name: the vocabulary name
:type name: str
:param required: whether the vocabulary is required
:type required: bool
:param category: the category of the vocabulary
:type category: str
:return: The available or created Vocabulary object
:rtype: dict or None
"""
value = kwargs.get("name", None)
vocab = self.read(
filters={
"mode": "and",
"filters": [{"key": "name", "values": [value]}],
"filterGroups": [],
}
)
if vocab is None:
try:
return self.create(**kwargs)
except ValueError:
return None
return vocab
[docs]
def update_field(self, **kwargs):
"""Update a Vocabulary object field.
:param id: the Vocabulary id
:type id: str
:param input: the input of the field
:type input: list
:return: The updated Vocabulary object
:rtype: dict or None
"""
id = kwargs.get("id", None)
input = kwargs.get("input", None)
if id is not None and input is not None:
self.opencti.app_logger.info("Updating Vocabulary", {"id": id})
query = """
mutation VocabularyEdit($id: ID!, $input: [EditInput!]!) {
vocabularyFieldPatch(id: $id, input: $input) {
id
standard_id
entity_type
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"input": input,
},
)
return self.opencti.process_multiple_fields(
result["data"]["vocabularyFieldPatch"]
)
else:
self.opencti.app_logger.error(
"[opencti_vocabulary] Missing parameters: id and input"
)
return None