Skip to content

Commit

Permalink
fix: make alert json (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Mar 6, 2024
1 parent 3f219af commit e550431
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 59 deletions.
15 changes: 15 additions & 0 deletions examples/workflows/squadcast_example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
workflow:
id: squadcast
description: squadcast
triggers:
- type: alert
actions:
- name: create-incident
provider:
config: "{{ providers.squadcast }}"
type: squadcast
with:
additional_json: '{{ alert }}'
description: TEST
message: '{{ alert.name }}-test'
notify_type: incident
6 changes: 6 additions & 0 deletions keep/api/models/alert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import hashlib
import json
import logging
from enum import Enum
from typing import Any, Dict
Expand Down Expand Up @@ -73,6 +74,11 @@ class AlertDto(BaseModel):
group: bool = False # Whether the alert is a group alert
note: str | None = None # The note of the alert

def __str__(self) -> str:
# Convert the model instance to a dictionary
model_dict = self.dict()
return json.dumps(model_dict, indent=4, default=str)

@validator("fingerprint", pre=True, always=True)
def assign_fingerprint_if_none(cls, fingerprint, values):
if fingerprint is None:
Expand Down
2 changes: 2 additions & 0 deletions keep/iohandler/iohandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ def _render(self, key, safe=False, default=""):
original_stderr = sys.stderr
sys.stderr = io.StringIO()
rendered = chevron.render(_key, context, warn=True)
# chevron.render will escape the quotes, we need to unescape them
rendered = rendered.replace(""", '"')
stderr_output = sys.stderr.getvalue()
sys.stderr = original_stderr
# If render should failed if value does not exists
Expand Down
161 changes: 102 additions & 59 deletions keep/providers/squadcast_provider/squadcast_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class SquadcastProviderAuthConfig:
"required": True,
"description": "Service region: EU/US",
"hint": "https://apidocs.squadcast.com/#intro",
"sensitive": False
"sensitive": False,
}
)
refresh_token: str | None = dataclasses.field(
Expand All @@ -31,7 +31,7 @@ class SquadcastProviderAuthConfig:
"hint": "https://support.squadcast.com/docs/squadcast-public-api",
"sensitive": True,
},
default=None
default=None,
)
webhook_url: str | None = dataclasses.field(
metadata={
Expand All @@ -40,7 +40,7 @@ class SquadcastProviderAuthConfig:
"hint": "https://support.squadcast.com/integrations/incident-webhook-incident-webhook-api",
"sensitive": True,
},
default=None
default=None,
)


Expand All @@ -59,19 +59,24 @@ class SquadcastProvider(BaseProvider):
]

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
super().__init__(context_manager, provider_id, config)

def validate_scopes(self):
"""
Validates that the user has the required scopes to use the provider.
"""
return {
"authenticated": True,
}
refresh_headers = {
"content-type": "application/json",
"X-Refresh-Token": f"{self.authentication_config.refresh_token}"
"X-Refresh-Token": f"{self.authentication_config.refresh_token}",
}
resp = requests.get(f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers)
resp = requests.get(
f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers
)
try:
resp.raise_for_status()
scopes = {
Expand All @@ -85,80 +90,119 @@ def validate_scopes(self):
return scopes

def __get_endpoint(self, endpoint: str):
if endpoint == 'auth':
return ('https://auth.eu.squadcast.com', 'https://auth.squadcast.com')[
self.authentication_config.service_region == 'US']
elif endpoint == 'api':
return ('https://api.eu.squadcast.com', 'https://api.squadcast.com')[
self.authentication_config.service_region == 'US']
if endpoint == "auth":
return ("https://auth.eu.squadcast.com", "https://auth.squadcast.com")[
self.authentication_config.service_region == "US"
]
elif endpoint == "api":
return ("https://api.eu.squadcast.com", "https://api.squadcast.com")[
self.authentication_config.service_region == "US"
]

def validate_config(self):
self.authentication_config = SquadcastProviderAuthConfig(
**self.config.authentication
)
if (
not self.authentication_config.refresh_token
and not self.authentication_config.webhook_url
not self.authentication_config.refresh_token
and not self.authentication_config.webhook_url
):
raise ProviderConfigException(
"SquadcastProvider requires either refresh_token or webhook_url",
provider_id=self.provider_id,
)

def _create_incidents(self, headers: dict, message: str, description: str, priority: str = "",
status: str = "",
event_id: str = ""):

body = json.dumps({
"message": message,
"description": description,
"priority": priority,
"status": status,
"event_id": event_id
})

return requests.post(self.authentication_config.webhook_url, data=body, headers=headers)

def _crete_notes(self, headers: dict, message: str, incident_id: str, attachments: list = []):
body = json.dumps({
"message": message,
"attachments": attachments
})
return requests.post(f"{self.__get_endpoint('api')}/v3/incidents/{incident_id}/warroom", data=body,
headers=headers)

def _notify(self, notify_type: str, message: str = "", description: str = "", incident_id: str = "",
priority: str = "",
status: str = "",
event_id: str = "", attachments: list = [], **kwargs) -> dict:
def _create_incidents(
self,
headers: dict,
message: str,
description: str,
priority: str = "",
status: str = "",
event_id: str = "",
):
body = json.dumps(
{
"message": message,
"description": description,
"priority": priority,
"status": status,
"event_id": event_id,
}
)

return requests.post(
self.authentication_config.webhook_url, data=body, headers=headers
)

def _crete_notes(
self, headers: dict, message: str, incident_id: str, attachments: list = []
):
body = json.dumps({"message": message, "attachments": attachments})
return requests.post(
f"{self.__get_endpoint('api')}/v3/incidents/{incident_id}/warroom",
data=body,
headers=headers,
)

def _notify(
self,
notify_type: str,
message: str = "",
description: str = "",
incident_id: str = "",
priority: str = "",
status: str = "",
event_id: str = "",
attachments: list = [],
**kwargs,
) -> dict:
"""
Create an incident or notes using the Squadcast API.
"""
self.logger.info(
f"Creating {notify_type} using SquadcastProvider",
extra={
notify_type: notify_type
})
extra={notify_type: notify_type},
)
refresh_headers = {
"content-type": "application/json",
"X-Refresh-Token": f"{self.authentication_config.refresh_token}"
"X-Refresh-Token": f"{self.authentication_config.refresh_token}",
}
api_key_resp = requests.get(f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers)
api_key_resp = requests.get(
f"{self.__get_endpoint('auth')}/oauth/access-token", headers=refresh_headers
)
headers = {
"content-type": "application/json",
"Authorization": f"Bearer {api_key_resp.json()['data']['access_token']}",
}
if notify_type == 'incident':
if notify_type == "incident":
if message == "" or description == "":
raise Exception(f"message: \"{message}\" and description: \"{description}\" cannot be empty")
resp = self._create_incidents(headers=headers, message=message, description=description, priority=priority,
status=status, event_id=event_id)
elif notify_type == 'notes':
raise Exception(
f'message: "{message}" and description: "{description}" cannot be empty'
)
resp = self._create_incidents(
headers=headers,
message=message,
description=description,
priority=priority,
status=status,
event_id=event_id,
)
elif notify_type == "notes":
if message == "" or incident_id == "":
raise Exception(f"message: \"{message}\" and incident_id: \"{incident_id}\" cannot be empty")
resp = self._crete_notes(headers=headers, message=message, incident_id=incident_id, attachments=attachments)
raise Exception(
f'message: "{message}" and incident_id: "{incident_id}" cannot be empty'
)
resp = self._crete_notes(
headers=headers,
message=message,
incident_id=incident_id,
attachments=attachments,
)
else:
raise Exception("notify_type is a mandatory field, expected: incident | notes")
raise Exception(
"notify_type is a mandatory field, expected: incident | notes"
)
try:
resp.raise_for_status()
return resp.json()
Expand All @@ -175,7 +219,7 @@ def dispose(self):
if __name__ == "__main__":
import os

squadcast_api_key = os.environ.get("MAILCHIMP_API_KEY")
squadcast_api_key = os.environ.get("SQUADCAST_API_KEY")
context_manager = ContextManager(
tenant_id="singletenant",
workflow_id="test",
Expand All @@ -184,11 +228,10 @@ def dispose(self):
config = ProviderConfig(
authentication={"api_key": squadcast_api_key},
)
provider = SquadcastProvider(context_manager, provider_id="squadcast-test", config=config)
provider = SquadcastProvider(
context_manager, provider_id="squadcast-test", config=config
)
response = provider.notify(
"[email protected]",
"[email protected]",
"Hello World from Keep!",
"<strong>Test</strong> with HTML",
description="test",
)
print(response)

0 comments on commit e550431

Please sign in to comment.