Source code for pycti.entities.opencti_role

from typing import Dict, List, Optional


[docs] class Role: """Representation of a role in OpenCTI Roles can have capabilities. Groups have roles, and the combined capabilities of those roles determine what a group of users can do on the platform. Check the properties attribute of the class to understand what default properties are fetched. :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` :type opencti: OpenCTIApiClient """
[docs] def __init__(self, opencti): """Initialize the Role instance. :param opencti: OpenCTI API client instance :type opencti: OpenCTIApiClient """
[docs] self.opencti = opencti
[docs] self.properties = """ id standard_id entity_type parent_types name description created_at updated_at capabilities { id name description } capabilitiesInDraft { id name description } can_manage_sensitive_config """
[docs] def list(self, **kwargs) -> List[Dict]: """Search or list the roles on the server. :param search: Defaults to None. :type search: str, optional :param first: Defaults to 500 Return the first x results from ID or beginning if $after is not specified. :type first: int, optional :param after: Return all results after the given ID, useful for pagination. Ignored if returning all results, defaults to None. :type after: str, optional :param orderBy: Field to order by. Must be one of "name", "created_at", "updated_at", or "_score". Defaults to "name", defaults to "name". :type orderBy: str, optional :param orderMode: Direction to order in, either "asc" or "desc", defaults to "asc". :type orderMode: str, optional :param customAttributes: Defaults to None. Custom attributes to return from query. If None, defaults are used. :type customAttributes: str, optional :param getAll: Defaults to False. Retrieve all results. If true then the "first" param is ignored. :type getAll: bool, optional :param withPagination: Defaults to False Whether to include pagination pageInfo properties in result. :type withPagination: bool, optional :return: List of Python dictionaries with the properties of the role. :rtype: List[Dict] """ search = kwargs.get("search", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) self.opencti.admin_logger.info( "Searching roles matching search term", {"search": search} ) if get_all: first = 100 query = ( """ query RoleList($first: Int, $after: ID, $orderBy: RolesOrdering, $orderMode: OrderingMode, $search: String) { roles(first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, search: $search) { edges { node { """ + (self.properties if custom_attributes is None else custom_attributes) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, "search": search, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["roles"]) final_data = final_data + data while result["data"]["roles"]["pageInfo"]["hasNextPage"]: after = result["data"]["roles"]["pageInfo"]["endCursor"] result = self.opencti.query( query, { "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, "search": search, }, ) data = self.opencti.process_multiple(result["data"]["roles"]) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["roles"], with_pagination )
[docs] def read(self, **kwargs) -> Optional[Dict]: """Get a role given its ID or a search term One of id or search must be provided. :param id: ID of the role on the platform :type id: str, optional :param search: Search term for a role, e.g. its name :type search: str, optional :param customAttributes: Custom attributes on the role to return :type customAttributes: str, optional :return: Representation of the role :rtype: Optional[Dict] """ id = kwargs.get("id", None) search = kwargs.get("search", None) custom_attributes = kwargs.get("customAttributes", None) if id is not None: self.opencti.admin_logger.info("Reading role", {"id": id}) query = ( """ query RoleRead($id: String!) { role(id: $id) { """ + (self.properties if custom_attributes is None else custom_attributes) + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields(result["data"]["role"]) elif search is not None: result = self.list(search=search) return result[0] if len(result) > 0 else None else: self.opencti.admin_logger.error( "[opencti_role] Missing parameters: id or search" ) return None
[docs] def delete(self, **kwargs): """Delete a role given its ID :param id: ID for the role on the platform. :type id: str """ id = kwargs.get("id", None) if id is None: self.opencti.admin_logger.error("[opencti_role] Missing parameter: id") return None self.opencti.admin_logger.info("Deleting role", {"id": id}) query = """ mutation RoleDelete($id: ID!) { roleEdit(id: $id) { delete } } """ self.opencti.query(query, {"id": id})
[docs] def create(self, **kwargs) -> Optional[Dict]: """Add a new role to OpenCTI. :param name: Name to assign to the role. :type name: str :param description: Optional. Description of the role, defaults to None. :type description: str, optional :param customAttributes: Custom attributes to return on role :type customAttributes: str, optional :return: Representation of the role. :rtype: Optional[Dict] """ name = kwargs.get("name", None) description = kwargs.get("description", None) custom_attributes = kwargs.get("customAttributes", None) if name is None: self.opencti.admin_logger.error("[opencti_role] Missing parameter: name") return None self.opencti.admin_logger.info( "Creating new role", {"name": name, "description": description} ) query = ( """ mutation RoleCreate($input: RoleAddInput!) { roleAdd(input: $input) { """ + (self.properties if custom_attributes is None else custom_attributes) + """ } } """ ) result = self.opencti.query( query, {"input": {"name": name, "description": description}} ) return self.opencti.process_multiple_fields(result["data"]["roleAdd"])
[docs] def update_field(self, **kwargs) -> Optional[Dict]: """Updates a given role with the given inputs Example of input:: [ { "key": "name", "value": "NewCustomRole" }, { "key": "can_manage_sensitive_config", "value": False } ] :param id: ID for the role on the platform :type id: str :param input: List of EditInput objects :type input: List[Dict] :param customAttributes: Custom attributes to return on the role :type customAttributes: str, optional :return: Representation of the role :rtype: Optional[Dict] """ id = kwargs.get("id", None) input = kwargs.get("input", None) custom_attributes = kwargs.get("customAttributes", None) if id is None or input is None: self.opencti.admin_logger.error( "[opencti_role] Missing parameters: id and input" ) return None self.opencti.admin_logger.info( "Editing role with input", {"id": id, "input": input} ) query = ( """ mutation RoleUpdate($id: ID!, $input: [EditInput]!) { roleEdit(id: $id) { fieldPatch(input: $input) { """ + (self.properties if custom_attributes is None else custom_attributes) + """ } } } """ ) result = self.opencti.query(query, {"id": id, "input": input}) return self.opencti.process_multiple_fields( result["data"]["roleEdit"]["fieldPatch"] )
[docs] def add_capability(self, **kwargs) -> Optional[Dict]: """Adds a capability to a role :param id: ID of the role. :type id: str :param capability_id: ID of the capability to add. :type capability_id: str :return: Representation of the relationship, including the role and capability :rtype: Optional[Dict] """ id = kwargs.get("id", None) capability_id = kwargs.get("capability_id", None) if id is None or capability_id is None: self.opencti.admin_logger.error( "[opencti_role] Missing parameters: id and capability_id" ) return None self.opencti.admin_logger.info( "Adding capability to role", {"roleId": id, "capabilityId": capability_id} ) query = ( """ mutation RoleEditAddCapability($id: ID!, $input: InternalRelationshipAddInput!) { roleEdit(id: $id) { relationAdd(input: $input) { id entity_type parent_types created_at updated_at from { ... on Role { """ + self.properties + """ } } to { ... on Capability { id, name, description } } } } } """ ) result = self.opencti.query( query, { "id": id, "input": {"relationship_type": "has-capability", "toId": capability_id}, }, ) return self.opencti.process_multiple_fields( result["data"]["roleEdit"]["relationAdd"] )
[docs] def delete_capability(self, **kwargs) -> Optional[Dict]: """Removes a capability from a role :param id: ID of the role :type id: str :param capability_id: ID of the capability to remove :type capability_id: str :return: Representation of the role after removing the capability :rtype: Optional[Dict] """ id = kwargs.get("id", None) capability_id = kwargs.get("capability_id", None) if id is None or capability_id is None: self.opencti.admin_logger.error( "[opencti_role] Missing parameters: id and capability_id" ) return None self.opencti.admin_logger.info( "Removing capability from role", {"roleId": id, "capabilityId": capability_id}, ) query = ( """ mutation RoleEditDeleteCapability($id: ID!, $toId: StixRef!) { roleEdit(id: $id) { relationDelete(toId: $toId, relationship_type: "has-capability") { """ + self.properties + """ } } } """ ) result = self.opencti.query(query, {"id": id, "toId": capability_id}) return self.opencti.process_multiple_fields( result["data"]["roleEdit"]["relationDelete"] )
[docs] def process_multiple_fields(self, data): """Process and normalize fields in role data. :param data: the role data dictionary to process :type data: dict :return: the processed role data with normalized fields :rtype: dict """ if "capabilities" in data: data["capabilities"] = self.opencti.process_multiple(data["capabilities"]) data["capabilitiesIds"] = self.opencti.process_multiple_ids( data["capabilities"] ) return data