pycti.api.opencti_api_client ============================ .. py:module:: pycti.api.opencti_api_client Classes ------- .. autoapisummary:: pycti.api.opencti_api_client.File pycti.api.opencti_api_client.OpenCTIApiClient Functions --------- .. autoapisummary:: pycti.api.opencti_api_client.build_request_headers Module Contents --------------- .. py:function:: build_request_headers(token: str, custom_headers: str, app_logger, provider: str) Build request headers for OpenCTI API requests. :param token: the API authentication token :type token: str :param custom_headers: custom headers in format "header01:value;header02:value" :type custom_headers: str :param app_logger: the application logger instance :type app_logger: logging.Logger :param provider: the provider string for User-Agent header :type provider: str :return: dictionary of request headers :rtype: dict .. py:class:: File(name, data, mime='text/plain') File object for OpenCTI file uploads. Represents a file to be uploaded via the OpenCTI API. :param name: the filename :type name: str :param data: the file content (string or bytes) :type data: str or bytes :param mime: the MIME type of the file, defaults to "text/plain" :type mime: str, optional Initialize the File instance. :param name: the filename :type name: str :param data: the file content :type data: str or bytes :param mime: the MIME type of the file (default: "text/plain") :type mime: str .. py:attribute:: name .. py:attribute:: data .. py:attribute:: mime :value: 'text/plain' .. py:class:: OpenCTIApiClient(url: str, token: str, log_level: str = 'info', ssl_verify: Union[bool, str] = False, proxies: Union[Dict[str, str], None] = None, json_logging: bool = False, bundle_send_to_queue: bool = True, cert: Union[str, Tuple[str, str], None] = None, custom_headers: Optional[str] = None, perform_health_check: bool = True, requests_timeout: int = 300, provider: Optional[str] = None) Main API client for OpenCTI :param url: OpenCTI API url :type url: str :param token: OpenCTI API token :type token: str :param log_level: log level for the client :type log_level: str, optional :param ssl_verify: Requiring the requests to verify the TLS certificate at the server. :type ssl_verify: bool, str, optional :param proxies: proxy configuration with "http" and "https" keys (e.g., {"http": "http://my_proxy:8080", "https": "http://my_proxy:8080"}) :type proxies: dict, optional :param json_logging: format the logs as json if set to True :type json_logging: bool, optional :param bundle_send_to_queue: if bundle will be sent to queue :type bundle_send_to_queue: bool, optional :param cert: If String, file path to pem file. If Tuple, a ('path_to_cert.crt', 'path_to_key.key') pair representing the certificate and the key. :type cert: str, tuple, optional :param custom_headers: Add custom headers to use with the graphql queries :type custom_headers: str, optional must in the format header01:value;header02:value :param perform_health_check: if client init must check the api access :type perform_health_check: bool, optional :param requests_timeout: define the timeout for API requests in seconds :type requests_timeout: int, optional :param provider: define client provider, and is used to specify it in requests user agent header :type provider: string, optional Initialize the OpenCTIApiClient instance. :param url: OpenCTI platform URL :type url: str :param token: OpenCTI API authentication token :type token: str :param log_level: logging level (default: "info") :type log_level: str :param ssl_verify: SSL certificate verification setting :type ssl_verify: Union[bool, str] :param proxies: proxy configuration dictionary with "http" and "https" keys :type proxies: Dict[str, str] or None :param json_logging: whether to format logs as JSON (default: False) :type json_logging: bool :param bundle_send_to_queue: whether bundles are sent to queue (default: True) :type bundle_send_to_queue: bool :param cert: client certificate path or tuple of (cert, key) paths :type cert: str, tuple, or None :param custom_headers: custom headers in format "header01:value;header02:value" :type custom_headers: str or None :param perform_health_check: whether to check API access on init (default: True) :type perform_health_check: bool :param requests_timeout: timeout for API requests in seconds (default: 300) :type requests_timeout: int :param provider: client provider for User-Agent header (format: provider/version) :type provider: str or None :raises ValueError: If URL or token is missing or invalid .. py:attribute:: bundle_send_to_queue :value: True .. py:attribute:: ssl_verify :value: False .. py:attribute:: cert :value: None .. py:attribute:: proxies :value: None .. py:attribute:: logger_class .. py:attribute:: app_logger .. py:attribute:: admin_logger .. py:attribute:: api_token .. py:attribute:: api_url .. py:attribute:: provider :value: None .. py:attribute:: request_headers .. py:attribute:: session .. py:attribute:: session_requests_timeout :value: 300 .. py:attribute:: work .. py:attribute:: notification .. py:attribute:: trash .. py:attribute:: draft .. py:attribute:: workspace .. py:attribute:: public_dashboard .. py:attribute:: playbook .. py:attribute:: connector .. py:attribute:: stix2 .. py:attribute:: pir .. py:attribute:: internal_file .. py:attribute:: file .. py:attribute:: vocabulary .. py:attribute:: label .. py:attribute:: marking_definition .. py:attribute:: external_reference .. py:attribute:: kill_chain_phase .. py:attribute:: opencti_stix_object_or_stix_relationship .. py:attribute:: stix .. py:attribute:: stix_domain_object .. py:attribute:: stix_core_object .. py:attribute:: stix_cyber_observable .. py:attribute:: stix_core_relationship .. py:attribute:: stix_sighting_relationship .. py:attribute:: stix_nested_ref_relationship .. py:attribute:: identity .. py:attribute:: event .. py:attribute:: location .. py:attribute:: threat_actor .. py:attribute:: threat_actor_group .. py:attribute:: threat_actor_individual .. py:attribute:: intrusion_set .. py:attribute:: infrastructure .. py:attribute:: campaign .. py:attribute:: case_incident .. py:attribute:: feedback .. py:attribute:: case_rfi .. py:attribute:: case_rft .. py:attribute:: task .. py:attribute:: incident .. py:attribute:: malware .. py:attribute:: malware_analysis .. py:attribute:: tool .. py:attribute:: channel .. py:attribute:: narrative .. py:attribute:: language .. py:attribute:: vulnerability .. py:attribute:: security_coverage .. py:attribute:: attack_pattern .. py:attribute:: course_of_action .. py:attribute:: data_component .. py:attribute:: data_source .. py:attribute:: report .. py:attribute:: note .. py:attribute:: observed_data .. py:attribute:: opinion .. py:attribute:: grouping .. py:attribute:: indicator .. py:attribute:: capability .. py:attribute:: role .. py:attribute:: group .. py:attribute:: user .. py:attribute:: settings .. py:attribute:: draft_id :value: '' .. py:method:: set_applicant_id_header(applicant_id) Set the applicant ID header for impersonation. :param applicant_id: the ID of the user to impersonate :type applicant_id: str .. py:method:: set_playbook_id_header(playbook_id) Set the playbook ID header for tracking playbook execution. :param playbook_id: the ID of the playbook being executed :type playbook_id: str .. py:method:: set_event_id(event_id) Set the event ID header for event tracking. :param event_id: the ID of the event :type event_id: str .. py:method:: get_draft_id() Get the current draft ID. :return: the current draft ID or empty string if not set :rtype: str .. py:method:: set_draft_id(draft_id) Set the draft ID header for draft mode operations. :param draft_id: the ID of the draft workspace :type draft_id: str .. py:method:: set_work_id(work_id) Set the work ID header for work validation :param work_id: the ID of the work :type work_id: str .. py:method:: set_synchronized_upsert_header(synchronized) Set the synchronized upsert header. :param synchronized: whether upsert should be synchronized :type synchronized: bool .. py:method:: set_previous_standard_header(previous_standard) Set the previous standard header for update operations. :param previous_standard: the previous standard ID :type previous_standard: str .. py:method:: get_request_headers(hide_token=True) Get a copy of current request headers. :param hide_token: if True, masks the Authorization token with asterisks :type hide_token: bool :return: copy of request headers :rtype: dict .. py:method:: set_retry_number(retry_number) Set the retry number header for tracking retries. :param retry_number: the current retry attempt number, or None to clear :type retry_number: int or None .. py:method:: query(query, variables=None, disable_impersonate=False) Submit a query to the OpenCTI GraphQL API. :param query: GraphQL query string :type query: str :param variables: GraphQL query variables, defaults to {} :type variables: dict, optional :param disable_impersonate: removes impersonate header if set to True, defaults to False :type disable_impersonate: bool, optional :return: returns the response JSON content :rtype: dict :raises ValueError: if the API returns an error or non-200 status code .. py:method:: fetch_opencti_file(fetch_uri, binary=False, serialize=False) Get file from the OpenCTI API. :param fetch_uri: download URI to use :type fetch_uri: str :param binary: if True, returns raw bytes; if False, returns text, defaults to False :type binary: bool, optional :param serialize: if True, returns base64-encoded content, defaults to False :type serialize: bool, optional :return: returns either the file content as text, bytes, base64-encoded string, or None on failure :rtype: str, bytes, or None .. py:method:: health_check() Submit an example request to the OpenCTI API. :return: returns True if the health check has been successful :rtype: bool .. py:method:: get_logs_worker_config() Get the logs worker configuration from the OpenCTI platform. :return: the logs worker configuration including Elasticsearch settings :rtype: dict .. py:method:: not_empty(value) Check if a value is empty for str, list and int. :param value: value to check :type value: str or list or int or float or bool or datetime.date :return: returns True if the value is one of the supported types and not empty :rtype: bool .. py:method:: process_multiple(data: dict, with_pagination=False) -> Union[dict, list] Process data returned by the OpenCTI API with multiple entities. :param data: data to process :type data: dict :param with_pagination: whether to use pagination with the API, defaults to False :type with_pagination: bool, optional :return: returns either a dict or list with the processed entities :rtype: dict or list .. py:method:: process_multiple_ids(data) -> list Process data returned by the OpenCTI API with multiple ids. :param data: data to process :type data: list :return: returns a list of ids :rtype: list .. py:method:: process_multiple_fields(data) Process data returned by the OpenCTI API with multiple fields. :param data: data to process :type data: dict :return: returns the data dict with all fields processed :rtype: dict .. py:method:: upload_file(**kwargs) upload a file to OpenCTI API :param `**kwargs`: arguments for file upload (required: `file_name` and `data`) :return: returns the query response for the file upload :rtype: dict .. py:method:: create_draft(**kwargs) Create a draft in OpenCTI API. :param draft_name: the name of the draft to create (required) :type draft_name: str :param entity_id: the entity ID to associate with the draft :type entity_id: str, optional :return: returns the draft workspace ID :rtype: str .. py:method:: upload_pending_file(**kwargs) Upload a pending file to OpenCTI API. :param file_name: the name of the file to upload (required) :type file_name: str :param data: the file content, defaults to reading from file_name path :type data: str or bytes, optional :param mime_type: the MIME type of the file, defaults to "text/plain" :type mime_type: str, optional :param entity_id: the entity ID to associate with the file :type entity_id: str, optional :param file_markings: list of marking definition IDs to apply :type file_markings: list, optional :return: returns the query response for the file upload :rtype: dict .. py:method:: send_bundle_to_api(**kwargs) Push a bundle to a queue through OpenCTI API. :param connector_id: the connector ID (required) :type connector_id: str :param bundle: the STIX bundle to push (required) :type bundle: str :param work_id: the work ID to associate with the bundle :type work_id: str, optional :return: returns the query response for the bundle push :rtype: dict .. py:method:: get_stix_content(id) Get the STIX content of any entity. :param id: the ID of the entity :type id: str :return: the STIX content in JSON :rtype: dict .. py:method:: connector_jwt() .. py:method:: get_attribute_in_extension(key, stix_object) -> Any :staticmethod: Get an attribute value from OpenCTI STIX extensions. Searches for the key in OpenCTI extension definitions, or falls back to the object's top-level attributes. :param key: the attribute key to retrieve :type key: str :param stix_object: the STIX object containing extensions :type stix_object: dict :return: the attribute value if found, None otherwise :rtype: Any .. py:method:: get_attribute_in_mitre_extension(key, stix_object) -> Any :staticmethod: Get an attribute value from MITRE ATT&CK STIX extension. :param key: the attribute key to retrieve :type key: str :param stix_object: the STIX object containing extensions :type stix_object: dict :return: the attribute value if found, None otherwise :rtype: Any