Skip to content

Commit

Permalink
feat: support updating web hook url [MD-482] (#9890)
Browse files Browse the repository at this point in the history
  • Loading branch information
jgongd authored Sep 11, 2024
1 parent 02fbdbb commit de89f68
Show file tree
Hide file tree
Showing 19 changed files with 2,120 additions and 1,203 deletions.
146 changes: 146 additions & 0 deletions e2e_tests/tests/cluster/test_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,149 @@ def test_webhook_rbac() -> None:
bindings.delete_DeleteWebhook(user2_sess, id=workspace_webhook_id)

test_agent_user_group._delete_workspace_and_check(admin_sess, workspace)


@pytest.mark.e2e_cpu
def test_editing_webhook() -> None:
port = 5009
sess = api_utils.admin_session()

webhook_trigger = bindings.v1Trigger(
triggerType=bindings.v1TriggerType.TASK_LOG,
condition={"regex": "test-regex"},
)

default_path = f"/test/path/here/{str(uuid.uuid4())}"
res = bindings.post_PostWebhook(
sess,
body=bindings.v1Webhook(
url=f"http://localhost:{port}{default_path}",
webhookType=bindings.v1WebhookType.DEFAULT,
triggers=[webhook_trigger],
mode=bindings.v1WebhookMode.WORKSPACE,
name="",
workspaceId=None,
),
)
default_id = res.webhook.id
assert default_id is not None

specific_path = f"/test/path/here/{str(uuid.uuid4())}"
res = bindings.post_PostWebhook(
sess,
body=bindings.v1Webhook(
url=f"http://localhost:{port}{specific_path}",
webhookType=bindings.v1WebhookType.DEFAULT,
triggers=[webhook_trigger],
mode=bindings.v1WebhookMode.SPECIFIC,
name="specific-webhook2",
workspaceId=1,
),
)
specific_id = res.webhook.id
assert specific_id is not None

modified_path = f"/test/path/here/{str(uuid.uuid4())}"
bindings.patch_PatchWebhook(
sess,
body=bindings.v1PatchWebhook(url=f"http://localhost:{port}{modified_path}"),
id=default_id,
)
bindings.patch_PatchWebhook(
sess,
body=bindings.v1PatchWebhook(url=f"http://localhost:{port}{modified_path}"),
id=specific_id,
)

get_res = bindings.get_GetWebhooks(sess)
for webhook in get_res.webhooks:
if webhook.id == default_id or webhook.id == specific_id:
assert webhook.url == f"http://localhost:{port}{modified_path}"

bindings.delete_DeleteWebhook(sess, id=default_id)
bindings.delete_DeleteWebhook(sess, id=specific_id)


@pytest.mark.e2e_cpu
def test_log_pattern_webhook_cached_url_is_updated() -> None:
original_port = 5011
updated_port = 5012
original_server = utils.WebhookServer(original_port)
updated_server = utils.WebhookServer(updated_port)
sess = api_utils.admin_session()

regex = r"assert 0 <= self\.metrics_sigma"

webhook_trigger = bindings.v1Trigger(
triggerType=bindings.v1TriggerType.TASK_LOG,
condition={"regex": regex},
)

slack_path = f"/test/slack/path/here/{str(uuid.uuid4())}"
slack_webhook = bindings.post_PostWebhook(
sess,
body=bindings.v1Webhook(
url=f"http://localhost:{original_port}{slack_path}",
webhookType=bindings.v1WebhookType.SLACK,
triggers=[webhook_trigger],
mode=bindings.v1WebhookMode.WORKSPACE,
name="",
workspaceId=None,
),
)
slack_webhook_id = slack_webhook.webhook.id
assert slack_webhook_id is not None

default_path = f"/test/path/here/{str(uuid.uuid4())}"
default_webhook = bindings.post_PostWebhook(
sess,
body=bindings.v1Webhook(
url=f"http://localhost:{original_port}{default_path}",
webhookType=bindings.v1WebhookType.DEFAULT,
triggers=[webhook_trigger],
mode=bindings.v1WebhookMode.WORKSPACE,
name="",
workspaceId=None,
),
)
default_webhook_id = default_webhook.webhook.id
assert default_webhook_id is not None

bindings.patch_PatchWebhook(
sess,
body=bindings.v1PatchWebhook(url=f"http://localhost:{updated_port}{slack_path}"),
id=slack_webhook_id,
)
bindings.patch_PatchWebhook(
sess,
body=bindings.v1PatchWebhook(url=f"http://localhost:{updated_port}{default_path}"),
id=default_webhook_id,
)

exp_id = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
["--config", "hyperparameters.metrics_sigma=-1.0"],
)
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.ERROR)

time.sleep(10)
responses = original_server.close_and_return_responses()
assert len(responses) == 0

for _ in range(10):
responses = updated_server.return_responses()
if default_path in responses and slack_path in responses:
break
time.sleep(1)

responses = updated_server.close_and_return_responses()
assert len(responses) >= 2
# Only need a spot check we get the default / slack responses.
# Further tested in integrations.
assert "TASK_LOG" in responses[default_path]
assert "This log matched the regex" in responses[slack_path]

bindings.delete_DeleteWebhook(sess, id=slack_webhook_id or 0)
bindings.delete_DeleteWebhook(sess, id=default_webhook_id or 0)
48 changes: 48 additions & 0 deletions harness/determined/common/api/bindings.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions master/internal/webhooks/api_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
"strconv"
"time"

"github.com/google/uuid"
Expand All @@ -14,6 +16,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/determined-ai/determined/master/internal/api"
"github.com/determined-ai/determined/master/internal/db"
"github.com/determined-ai/determined/master/internal/grpcutil"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
Expand Down Expand Up @@ -256,3 +260,29 @@ func (a *WebhooksAPIServer) PostWebhookEventData(

return &res, nil
}

// PatchWebhook updates a webhook.
func (a *WebhooksAPIServer) PatchWebhook(
ctx context.Context, req *apiv1.PatchWebhookRequest,
) (*apiv1.PatchWebhookResponse, error) {
webhook, err := GetWebhook(ctx, int(req.Id))
if err != nil {
return nil, err
}
if err := authorizeEditRequest(ctx, webhook.Proto().WorkspaceId); err != nil {
return nil, err
}

err = UpdateWebhook(
ctx,
req.Id,
req.Webhook,
)
if err != nil && errors.Is(err, db.ErrNotFound) {
return nil, api.NotFoundErrs("webhook", strconv.Itoa(int(req.Id)), true)
} else if err != nil {
log.WithError(err).Errorf("failed to update webhook %d", req.Id)
return nil, err
}
return &apiv1.PatchWebhookResponse{}, nil
}
59 changes: 59 additions & 0 deletions master/internal/webhooks/postgres_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/determined-ai/determined/master/pkg/ptrs"
"github.com/determined-ai/determined/master/pkg/schemas"
"github.com/determined-ai/determined/master/pkg/schemas/expconf"
"github.com/determined-ai/determined/proto/pkg/webhookv1"

"github.com/google/uuid"
)
Expand Down Expand Up @@ -122,6 +123,26 @@ func (l *WebhookManager) removeTriggers(triggers []*Trigger) error {
return nil
}

func (l *WebhookManager) editTriggers(ts []*Trigger) error {
l.mu.Lock()
defer l.mu.Unlock()

for _, t := range ts {
if t.TriggerType != TriggerTypeTaskLog {
continue
}

regex, ok := t.Condition[regexConditionKey].(string)
if !ok {
return fmt.Errorf(
"expected webhook trigger to have regex in condition instead got %v", t.Condition)
}

l.regexToTriggers[regex].triggerIDToTrigger[t.ID].Webhook.URL = t.Webhook.URL
}
return nil
}

func (l *WebhookManager) getWebhookConfig(ctx context.Context, expID *int) (*expconf.WebhooksConfigV0, error) {
if expID == nil {
return nil, nil
Expand Down Expand Up @@ -901,3 +922,41 @@ WHERE q.id = webhook_events_queue.id RETURNING webhook_events_queue.*
}
return &eventBatch{tx: &tx, events: events}, nil
}

// updateWebhook updates a webhook in the database.
func (l *WebhookManager) updateWebhook(
ctx context.Context,
webhookID int32,
p *webhookv1.PatchWebhook,
) error {
var ts []*Trigger
err := db.Bun().NewSelect().Model(&ts).Relation("Webhook").
Where("webhook_id = ?", webhookID).
Scan(ctx, &ts)
if err != nil {
return fmt.Errorf("getting webhook triggers to update cache: %w", err)
}

err = db.Bun().RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
_, err := tx.NewUpdate().Table("webhooks").
Set("url = ?", p.Url).
Where("id = ?", webhookID).
Exec(ctx)
if err != nil {
return fmt.Errorf("updating webhook %d: %w", webhookID, err)
}

for _, t := range ts {
t.Webhook.URL = p.Url
}
if err := l.editTriggers(ts); err != nil {
return err
}
return nil
})
if err != nil {
return fmt.Errorf("updating webhook: %w", err)
}

return nil
}
11 changes: 11 additions & 0 deletions master/internal/webhooks/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/proto/pkg/webhookv1"
)

var defaultManager *WebhookManager
Expand Down Expand Up @@ -44,3 +45,13 @@ func DeleteWebhook(ctx context.Context, id WebhookID) error {

return defaultManager.deleteWebhook(ctx, id)
}

// UpdateWebhook updates a Webhook in the DB.
func UpdateWebhook(ctx context.Context, id int32, p *webhookv1.PatchWebhook) error {
if defaultManager == nil {
log.Error("webhook manager is uninitialized")
return nil
}

return defaultManager.updateWebhook(ctx, id, p)
}
Binary file modified proto/buf.image.bin
Binary file not shown.
Loading

0 comments on commit de89f68

Please sign in to comment.