Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenant configuration #84

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions asabiris/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ def __init__(self, args=None):
self.JinjaFormatterService = JinjaFormatterService(self)
self.AttachmentRenderingService = AttachmentRenderingService(self)

# Initialize TenantConfigExtractionService if present
if asab.Config.has_section("tenant_config"):
from .tenantconfiguration.tenant_config import TenantConfigExtractionService
self.TenantConfigExtractionService = TenantConfigExtractionService(self)
else:
self.TenantConfigExtractionService = None

# output services
self.EmailOutputService = EmailOutputService(self)

Expand Down
6 changes: 4 additions & 2 deletions asabiris/handlers/kafkahandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ async def handle_exception(self, exception, service_type, msg=None):
elif service_type == 'slack':
try:
L.log(asab.LOG_NOTICE, "Sending error notification to slack.")
await self.App.SlackOutputService.send_message(None, error_message)
tenant = msg.get("tenant", None)
await self.App.SlackOutputService.send_message(None, error_message, tenant)
except ASABIrisError as e:
L.info("Error notification to Slack unsuccessful: Explanation: {}".format(e.TechMessage))
except Exception:
Expand All @@ -270,7 +271,8 @@ async def handle_exception(self, exception, service_type, msg=None):
elif service_type == 'msteams':
try:
L.log(asab.LOG_NOTICE, "Sending error notification to MSTeams.")
await self.App.MSTeamsOutputService.send(error_message)
tenant = msg.get("tenant", None)
await self.App.MSTeamsOutputService.send(error_message, tenant)
except ASABIrisError as e:
L.info("Error notification to MSTeams unsuccessful: Explanation: {}".format(e.TechMessage))
except Exception:
Expand Down
3 changes: 2 additions & 1 deletion asabiris/orchestration/sendmsteams.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async def send_to_msteams(self, msg):

body = msg['body']
template = body["template"]
tenant = msg.get("tenant", None)

if not template.startswith("/Templates/MSTeams/"):
raise ASABIrisError(
Expand All @@ -62,4 +63,4 @@ async def send_to_msteams(self, msg):
params = body.get("params", {})
output = await self.JinjaService.format(template, params)

return await self.MSTeamsOutputService.send(output)
return await self.MSTeamsOutputService.send(output, tenant)
5 changes: 3 additions & 2 deletions asabiris/orchestration/sendslack.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async def send_to_slack(self, msg):
body = msg['body']
template = body["template"]
attachments = msg.get("attachments", None)
tenant = msg.get("tenant", None)
# if params no provided pass empty params
# - primarily use absolute path - starts with "/"
# - if absolute path is used, check it start with "/Templates/Slack"
Expand Down Expand Up @@ -81,14 +82,14 @@ async def send_to_slack(self, msg):
fallback_message = output
blocks = None

await self.SlackOutputService.send_message(blocks, fallback_message)
await self.SlackOutputService.send_message(blocks, fallback_message, tenant)
return

# Sending attachments

output = self.MarkdownFormatterService.unformat(output)
atts_gen = self.AttachmentRenderingService.render_attachment(attachments)
await self.SlackOutputService.send_files(output, atts_gen)
await self.SlackOutputService.send_files(output, atts_gen, tenant)


async def render_attachment(self, template, params):
Expand Down
18 changes: 16 additions & 2 deletions asabiris/output/msteams/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,22 @@ def __init__(self, app, service_name="MSTeamsOutputService"):
L.warning("Configuration section 'msteams' is not provided.")
self.TeamsWebhookUrl = None

async def send(self, body):
if self.TeamsWebhookUrl is None:
# Initialize Tenant Config Service
self.ConfigService = app.get_service("TenantConfigExtractionService")

async def send(self, body, tenant):
webhook_url = self.TeamsWebhookUrl

if tenant:
try:
webhook_url = self.ConfigService.get_msteams_config(tenant)
except KeyError:
L.warning(
"Tenant-specific MS Teams configuration not found for '{}'. Using global config.".format(tenant)
)

if webhook_url is None:
L.error("MS Teams webhook URL is missing.")
return

adaptive_card = {
Expand Down
77 changes: 41 additions & 36 deletions asabiris/output/slack/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,34 @@ class SlackOutputService(asab.Service, OutputABC):
def __init__(self, app, service_name="SlackOutputService"):
super().__init__(app, service_name)

try:
self.SlackWebhookUrl = check_config(asab.Config, "slack", "token")
self.Channel = check_config(asab.Config, "slack", "channel")
self.Client = WebClient(token=self.SlackWebhookUrl)
except configparser.NoOptionError:
L.error("Please provide token and channel in slack configuration section.")
exit()
except configparser.NoSectionError:
L.warning("Configuration section 'slack' is not provided.")
self.SlackWebhookUrl = None


async def send_message(self, blocks, fallback_message) -> None:
# Load global configuration as defaults
self.SlackWebhookUrl = check_config(asab.Config, "slack", "token")
self.Channel = check_config(asab.Config, "slack", "channel")
self.Client = WebClient(token=self.SlackWebhookUrl)
self.ConfigService = app.get_service("TenantConfigExtractionService")

async def send_message(self, blocks, fallback_message, tenant=None) -> None:
"""
Sends a message to a Slack channel.
"""
token, channel = (self.SlackWebhookUrl, self.Channel)

See https://api.slack.com/methods/chat.postMessage
if tenant:
try:
token, channel = self.ConfigService.get_slack_config(tenant)
except KeyError:
L.warning("Tenant-specific Slack configuration not found for '{}'. Using global config.".format(tenant))

"""
if self.Channel is None:
if channel is None:
raise ValueError("Cannot send message to Slack. Reason: Missing Slack channel")

if self.SlackWebhookUrl is None:
if token is None:
raise ValueError("Cannot send message to Slack. Reason: Missing Webhook URL or token")


# TODO: This could be a blocking operation, launch it in the proactor service
try:
channel_id = self.get_channel_id(self.Channel)
self.Client.chat_postMessage(
client = WebClient(token=token)
channel_id = self.get_channel_id(client, channel)
client.chat_postMessage(
channel=channel_id,
text=fallback_message,
blocks=blocks
Expand All @@ -71,23 +69,32 @@ async def send_message(self, blocks, fallback_message) -> None:
"error_message": str(e)
}
)
L.log(asab.LOG_NOTICE, "Slack message sent successfully.", struct_data={'channel': self.Channel})
L.log(asab.LOG_NOTICE, "Slack message sent successfully.", struct_data={'channel': channel})

async def send_files(self, body: str, atts_gen):
async def send_files(self, body: str, atts_gen, tenant=None):
"""
Sends a message to a Slack channel with attachments.
"""
if self.Channel is None:
token, channel = (self.SlackWebhookUrl, self.Channel)

if tenant:
try:
token, channel = self.ConfigService.get_slack_config(tenant)
except KeyError:
L.warning("Tenant-specific Slack configuration not found for '{}'. Using global config.".format(tenant))

if channel is None:
raise ValueError("Cannot send message to Slack. Reason: Missing Slack channel")

if self.SlackWebhookUrl is None:
if token is None:
raise ValueError("Cannot send message to Slack. Reason: Missing Webhook URL or token")

channel_id = self.get_channel_id(self.Channel)
client = WebClient(token=token)
channel_id = self.get_channel_id(client, channel)

try:
async for attachment in atts_gen:
# TODO: This could be a blocking operation, launch it in the proactor service
self.Client.files_upload_v2(
client.files_upload_v2(
channel=channel_id,
file=attachment.Content,
filename=attachment.FileName,
Expand All @@ -103,17 +110,15 @@ async def send_files(self, body: str, atts_gen):
"error_message": str(e)
}
)

L.log(asab.LOG_NOTICE, "Slack message sent successfully.")


def get_channel_id(self, channel_name, types="public_channel"):
# TODO: Cache the channel_id to limit number of requests to Slack API

# TODO: This could be a blocking operation, launch it in the proactor service
for response in self.Client.conversations_list(types=types):
def get_channel_id(self, client, channel_name, types="public_channel"):
"""
Fetches Slack channel ID from Slack API.
"""
for response in client.conversations_list(types=types):
for channel in response['channels']:
if channel['name'] == channel_name:
return channel['id']

raise KeyError("Slack channel not found.")
raise KeyError("Slack channel '{}' not found.".format(channel_name))
5 changes: 5 additions & 0 deletions asabiris/tenantconfiguration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .tenant_config import TenantConfigExtractionService

__all__ = [
"TenantConfigExtractionService"
]
74 changes: 74 additions & 0 deletions asabiris/tenantconfiguration/tenant_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import json
import configparser
import urllib.parse
import kazoo.client
import asab

L = logging.getLogger(__name__)


class TenantConfigExtractionService(asab.Service):

def __init__(self, app, service_name="TenantConfigExtractionService"):
super().__init__(app, service_name)

# Initialize ZooKeeper client only if configuration exists
self.TenantConfigPath = None
self.zk = None

# Try to read tenant config from asab.Config
try:
tenant_config_url = asab.Config.get("tenant_config", "url")
# Parse the ZooKeeper URL
url_parts = urllib.parse.urlparse(tenant_config_url)
self.TenantConfigPath = url_parts.path
self.zk_hosts = url_parts.netloc

# Initialize Kazoo client
self.zk = kazoo.client.KazooClient(hosts=self.zk_hosts)
self.zk.start()
L.info("ZooKeeper client initialized for tenant configuration.")

except (configparser.NoOptionError, configparser.NoSectionError):
L.warning("Tenant configuration not provided. Proceeding without ZooKeeper integration.")


def load_tenant_config(self, tenant):
"""
Loads tenant-specific configuration from ZooKeeper.
"""
path = "{}/{}".format(self.TenantConfigPath, tenant)
if not self.zk.exists(path):
raise KeyError("Tenant configuration not found at '{}'.".format(path))

data, _ = self.zk.get(path)
config = json.loads(data.decode("utf-8"))
L.info("Loaded tenant configuration from '{}'.".format(path))
return config

def get_slack_config(self, tenant):
"""
Retrieves Slack-specific configuration.
"""
config = self.load_tenant_config(tenant)
try:
slack_config = config["slack"]
token = slack_config["token"]
channel = slack_config["channel"]
L.info("Loaded Slack config for tenant '{}'.".format(tenant))
return token, channel
except KeyError as e:
raise KeyError("Slack configuration missing key: '{}'".format(e))

def get_msteams_config(self, tenant):
"""
Retrieves MS Teams-specific configuration.
"""
config = self.load_tenant_config(tenant)
try:
webhook_url = config["msteams"]["webhook_url"]
L.info("Loaded MS Teams config for tenant '{}'.".format(tenant))
return webhook_url
except KeyError as e:
raise KeyError("MS Teams configuration missing key: '{}'".format(e))