From e550431332d36c0c3c4831a1f3ae4b874e913170 Mon Sep 17 00:00:00 2001 From: Shahar Glazner Date: Wed, 6 Mar 2024 18:12:53 +0200 Subject: [PATCH] fix: make alert json (#909) --- examples/workflows/squadcast_example.yml | 15 ++ keep/api/models/alert.py | 6 + keep/iohandler/iohandler.py | 2 + .../squadcast_provider/squadcast_provider.py | 161 +++++++++++------- 4 files changed, 125 insertions(+), 59 deletions(-) create mode 100644 examples/workflows/squadcast_example.yml diff --git a/examples/workflows/squadcast_example.yml b/examples/workflows/squadcast_example.yml new file mode 100644 index 000000000..5849c5e3e --- /dev/null +++ b/examples/workflows/squadcast_example.yml @@ -0,0 +1,15 @@ +workflow: + id: squadcast + description: squadcast + triggers: + - type: alert + actions: + - name: create-incident + provider: + config: "{{ providers.squadcast }}" + type: squadcast + with: + additional_json: '{{ alert }}' + description: TEST + message: '{{ alert.name }}-test' + notify_type: incident diff --git a/keep/api/models/alert.py b/keep/api/models/alert.py index 70bd310e1..0e8832394 100644 --- a/keep/api/models/alert.py +++ b/keep/api/models/alert.py @@ -1,5 +1,6 @@ import datetime import hashlib +import json import logging from enum import Enum from typing import Any, Dict @@ -73,6 +74,11 @@ class AlertDto(BaseModel): group: bool = False # Whether the alert is a group alert note: str | None = None # The note of the alert + def __str__(self) -> str: + # Convert the model instance to a dictionary + model_dict = self.dict() + return json.dumps(model_dict, indent=4, default=str) + @validator("fingerprint", pre=True, always=True) def assign_fingerprint_if_none(cls, fingerprint, values): if fingerprint is None: diff --git a/keep/iohandler/iohandler.py b/keep/iohandler/iohandler.py index a83d7bf3e..b913fed51 100644 --- a/keep/iohandler/iohandler.py +++ b/keep/iohandler/iohandler.py @@ -194,6 +194,8 @@ def _render(self, key, safe=False, default=""): original_stderr = sys.stderr sys.stderr = io.StringIO() rendered = chevron.render(_key, context, warn=True) + # chevron.render will escape the quotes, we need to unescape them + rendered = rendered.replace(""", '"') stderr_output = sys.stderr.getvalue() sys.stderr = original_stderr # If render should failed if value does not exists diff --git a/keep/providers/squadcast_provider/squadcast_provider.py b/keep/providers/squadcast_provider/squadcast_provider.py index 7b4ee7dad..7a72d2a84 100644 --- a/keep/providers/squadcast_provider/squadcast_provider.py +++ b/keep/providers/squadcast_provider/squadcast_provider.py @@ -21,7 +21,7 @@ class SquadcastProviderAuthConfig: "required": True, "description": "Service region: EU/US", "hint": "https://apidocs.squadcast.com/#intro", - "sensitive": False + "sensitive": False, } ) refresh_token: str | None = dataclasses.field( @@ -31,7 +31,7 @@ class SquadcastProviderAuthConfig: "hint": "https://support.squadcast.com/docs/squadcast-public-api", "sensitive": True, }, - default=None + default=None, ) webhook_url: str | None = dataclasses.field( metadata={ @@ -40,7 +40,7 @@ class SquadcastProviderAuthConfig: "hint": "https://support.squadcast.com/integrations/incident-webhook-incident-webhook-api", "sensitive": True, }, - default=None + default=None, ) @@ -59,7 +59,7 @@ class SquadcastProvider(BaseProvider): ] def __init__( - self, context_manager: ContextManager, provider_id: str, config: ProviderConfig + self, context_manager: ContextManager, provider_id: str, config: ProviderConfig ): super().__init__(context_manager, provider_id, config) @@ -67,11 +67,16 @@ def validate_scopes(self): """ Validates that the user has the required scopes to use the provider. """ + return { + "authenticated": True, + } refresh_headers = { "content-type": "application/json", - "X-Refresh-Token": f"{self.authentication_config.refresh_token}" + "X-Refresh-Token": f"{self.authentication_config.refresh_token}", } - resp = requests.get(f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers) + resp = requests.get( + f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers + ) try: resp.raise_for_status() scopes = { @@ -85,80 +90,119 @@ def validate_scopes(self): return scopes def __get_endpoint(self, endpoint: str): - if endpoint == 'auth': - return ('https://auth.eu.squadcast.com', 'https://auth.squadcast.com')[ - self.authentication_config.service_region == 'US'] - elif endpoint == 'api': - return ('https://api.eu.squadcast.com', 'https://api.squadcast.com')[ - self.authentication_config.service_region == 'US'] + if endpoint == "auth": + return ("https://auth.eu.squadcast.com", "https://auth.squadcast.com")[ + self.authentication_config.service_region == "US" + ] + elif endpoint == "api": + return ("https://api.eu.squadcast.com", "https://api.squadcast.com")[ + self.authentication_config.service_region == "US" + ] def validate_config(self): self.authentication_config = SquadcastProviderAuthConfig( **self.config.authentication ) if ( - not self.authentication_config.refresh_token - and not self.authentication_config.webhook_url + not self.authentication_config.refresh_token + and not self.authentication_config.webhook_url ): raise ProviderConfigException( "SquadcastProvider requires either refresh_token or webhook_url", provider_id=self.provider_id, ) - def _create_incidents(self, headers: dict, message: str, description: str, priority: str = "", - status: str = "", - event_id: str = ""): - - body = json.dumps({ - "message": message, - "description": description, - "priority": priority, - "status": status, - "event_id": event_id - }) - - return requests.post(self.authentication_config.webhook_url, data=body, headers=headers) - - def _crete_notes(self, headers: dict, message: str, incident_id: str, attachments: list = []): - body = json.dumps({ - "message": message, - "attachments": attachments - }) - return requests.post(f"{self.__get_endpoint('api')}/v3/incidents/{incident_id}/warroom", data=body, - headers=headers) - - def _notify(self, notify_type: str, message: str = "", description: str = "", incident_id: str = "", - priority: str = "", - status: str = "", - event_id: str = "", attachments: list = [], **kwargs) -> dict: + def _create_incidents( + self, + headers: dict, + message: str, + description: str, + priority: str = "", + status: str = "", + event_id: str = "", + ): + body = json.dumps( + { + "message": message, + "description": description, + "priority": priority, + "status": status, + "event_id": event_id, + } + ) + + return requests.post( + self.authentication_config.webhook_url, data=body, headers=headers + ) + + def _crete_notes( + self, headers: dict, message: str, incident_id: str, attachments: list = [] + ): + body = json.dumps({"message": message, "attachments": attachments}) + return requests.post( + f"{self.__get_endpoint('api')}/v3/incidents/{incident_id}/warroom", + data=body, + headers=headers, + ) + + def _notify( + self, + notify_type: str, + message: str = "", + description: str = "", + incident_id: str = "", + priority: str = "", + status: str = "", + event_id: str = "", + attachments: list = [], + **kwargs, + ) -> dict: """ Create an incident or notes using the Squadcast API. """ self.logger.info( f"Creating {notify_type} using SquadcastProvider", - extra={ - notify_type: notify_type - }) + extra={notify_type: notify_type}, + ) refresh_headers = { "content-type": "application/json", - "X-Refresh-Token": f"{self.authentication_config.refresh_token}" + "X-Refresh-Token": f"{self.authentication_config.refresh_token}", } - api_key_resp = requests.get(f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers) + api_key_resp = requests.get( + f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers + ) headers = { "content-type": "application/json", "Authorization": f"Bearer {api_key_resp.json()['data']['access_token']}", } - if notify_type == 'incident': + if notify_type == "incident": if message == "" or description == "": - raise Exception(f"message: \"{message}\" and description: \"{description}\" cannot be empty") - resp = self._create_incidents(headers=headers, message=message, description=description, priority=priority, - status=status, event_id=event_id) - elif notify_type == 'notes': + raise Exception( + f'message: "{message}" and description: "{description}" cannot be empty' + ) + resp = self._create_incidents( + headers=headers, + message=message, + description=description, + priority=priority, + status=status, + event_id=event_id, + ) + elif notify_type == "notes": if message == "" or incident_id == "": - raise Exception(f"message: \"{message}\" and incident_id: \"{incident_id}\" cannot be empty") - resp = self._crete_notes(headers=headers, message=message, incident_id=incident_id, attachments=attachments) + raise Exception( + f'message: "{message}" and incident_id: "{incident_id}" cannot be empty' + ) + resp = self._crete_notes( + headers=headers, + message=message, + incident_id=incident_id, + attachments=attachments, + ) else: - raise Exception("notify_type is a mandatory field, expected: incident | notes") + raise Exception( + "notify_type is a mandatory field, expected: incident | notes" + ) try: resp.raise_for_status() return resp.json() @@ -175,7 +219,7 @@ def dispose(self): if __name__ == "__main__": import os - squadcast_api_key = os.environ.get("MAILCHIMP_API_KEY") + squadcast_api_key = os.environ.get("SQUADCAST_API_KEY") context_manager = ContextManager( tenant_id="singletenant", workflow_id="test", @@ -184,11 +228,10 @@ def dispose(self): config = ProviderConfig( authentication={"api_key": squadcast_api_key}, ) - provider = SquadcastProvider(context_manager, provider_id="squadcast-test", config=config) + provider = SquadcastProvider( + context_manager, provider_id="squadcast-test", config=config + ) response = provider.notify( - "onboarding@squadcast.dev", - "youremail@gmail.com", - "Hello World from Keep!", - "Test with HTML", + description="test", ) print(response)