Source code for pycti.entities.opencti_label

# coding: utf-8

import json
import uuid

from stix2.canonicalization.Canonicalize import canonicalize


[docs] class Label: """Main Label class for OpenCTI Manages labels and tags in the OpenCTI platform. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the Label instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id value color created_at updated_at standard_id """
@staticmethod
[docs] def generate_id(value): """Generate a STIX ID for a Label. :param value: The label value :type value: str :return: STIX ID for the label :rtype: str """ data = {"value": value} data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) return "label--" + id
[docs] def list(self, **kwargs): """List Label objects. :param filters: the filters to apply :type filters: dict :param search: the search keyword :type search: str :param first: return the first n rows from the after ID (or the beginning if not set) :type first: int :param after: ID of the first row for pagination :type after: str :param orderBy: field to order results by :type orderBy: str :param orderMode: ordering mode (asc/desc) :type orderMode: str :param customAttributes: custom attributes to return :type customAttributes: list :param getAll: whether to retrieve all results :type getAll: bool :param withPagination: whether to include pagination info :type withPagination: bool :return: List of Label objects :rtype: list """ filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) self.opencti.app_logger.info( "Listing Labels with filters", {"filters": json.dumps(filters)} ) query = ( """ query Labels($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: LabelsOrdering, $orderMode: OrderingMode) { labels(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + (custom_attributes if custom_attributes is not None else self.properties) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["labels"]) final_data = final_data + data while result["data"]["labels"]["pageInfo"]["hasNextPage"]: after = result["data"]["labels"]["pageInfo"]["endCursor"] self.opencti.app_logger.debug("Listing Labels", {"after": after}) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple(result["data"]["labels"]) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["labels"], with_pagination )
[docs] def read(self, **kwargs): """Read a Label object. :param id: the id of the Label :type id: str :param filters: the filters to apply if no id provided :type filters: dict :return: Label object :rtype: dict or None """ id = kwargs.get("id", None) filters = kwargs.get("filters", None) if id is not None: self.opencti.app_logger.info("Reading label", {"id": id}) query = ( """ query Label($id: String!) { label(id: $id) { """ + self.properties + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields(result["data"]["label"]) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None else: self.opencti.app_logger.error( "[opencti_label] Missing parameters: id or filters" ) return None
[docs] def create(self, **kwargs): """Create a Label object. :param stix_id: (optional) the STIX ID :type stix_id: str :param value: the label value (required) :type value: str :param color: (optional) the label color :type color: str :param x_opencti_stix_ids: (optional) list of additional STIX IDs :type x_opencti_stix_ids: list :param update: (optional) whether to update if exists (default: False) :type update: bool :return: Label object :rtype: dict or None """ stix_id = kwargs.get("stix_id", None) value = kwargs.get("value", None) color = kwargs.get("color", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) update = kwargs.get("update", False) if value is not None: self.opencti.app_logger.info("Creating Label", {"value": value}) query = ( """ mutation LabelAdd($input: LabelAddInput!) { labelAdd(input: $input) { """ + self.properties + """ } } """ ) result = self.opencti.query( query, { "input": { "stix_id": stix_id, "value": value, "color": color, "x_opencti_stix_ids": x_opencti_stix_ids, "update": update, } }, ) return self.opencti.process_multiple_fields(result["data"]["labelAdd"]) else: self.opencti.app_logger.error("[opencti_label] Missing parameters: value") return None
[docs] def read_or_create_unchecked(self, **kwargs): """Read or create a Label. If the user has no rights to create the label, return None. :param value: the label value :type value: str :return: The available or created Label object :rtype: dict or None """ value = kwargs.get("value", None) label = self.read( filters={ "mode": "and", "filters": [{"key": "value", "values": [value]}], "filterGroups": [], } ) if label is None: try: return self.create(**kwargs) except ValueError: return None return label
[docs] def update_field(self, **kwargs): """Update a Label object field. :param id: the Label id :type id: str :param input: the input of the field :type input: list :return: The updated Label object :rtype: dict or None """ id = kwargs.get("id", None) input = kwargs.get("input", None) if id is not None and input is not None: self.opencti.app_logger.info("Updating Label", {"id": id}) query = """ mutation LabelEdit($id: ID!, $input: [EditInput]!) { labelEdit(id: $id) { fieldPatch(input: $input) { id standard_id entity_type } } } """ result = self.opencti.query( query, { "id": id, "input": input, }, ) return self.opencti.process_multiple_fields( result["data"]["labelEdit"]["fieldPatch"] ) else: self.opencti.app_logger.error( "[opencti_label] Missing parameters: id and input" ) return None
[docs] def delete(self, **kwargs): """Delete a Label object. :param id: the id of the Label to delete :type id: str :return: None """ id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Deleting Label", {"id": id}) query = """ mutation LabelEdit($id: ID!) { labelEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error("[opencti_label] Missing parameters: id") return None