From a455950e53f8ef7f597ce5796c806531f8fe9a70 Mon Sep 17 00:00:00 2001 From: Vyzaldy Sanchez Date: Mon, 11 Nov 2024 12:51:46 -0400 Subject: [PATCH] Add beholder logging for custom compute (#15122) * Adds beholder logging * Bumps `common` * Bumps `common` * Bumps `common` * Improves var naming * make `gomodtidy` * Repurposes monitoring keys * Fixes CI * Bumps CCIP * Bumps common * Address review comments * Reverts unsupported approach --- core/capabilities/compute/compute.go | 43 ++++++++++---- core/capabilities/compute/monitoring.go | 3 + core/platform/monitoring.go | 15 +++++ core/services/workflows/delegate.go | 3 +- core/services/workflows/engine.go | 75 +++++++++++++------------ core/services/workflows/engine_test.go | 16 +++--- core/services/workflows/monitoring.go | 14 ----- 7 files changed, 99 insertions(+), 70 deletions(-) create mode 100644 core/capabilities/compute/monitoring.go create mode 100644 core/platform/monitoring.go diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 3527199cdb2..7e6961d2e8a 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "strings" "time" @@ -21,8 +22,10 @@ import ( coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/platform" ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" ) @@ -117,7 +120,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe m, ok := c.modules.get(id) if !ok { - mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID) + mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata) if err != nil { return capabilities.CapabilityResponse{}, err } @@ -128,10 +131,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe return c.executeWithModule(ctx, m.module, cfg.Config, request) } -func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) { +func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) { initStart := time.Now() - cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID) + cfg.Fetch = c.createFetcher() mod, err := host.NewModule(cfg, binary) if err != nil { return nil, fmt.Errorf("failed to instantiate WASM module: %w", err) @@ -140,7 +143,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w mod.Start() initDuration := time.Since(initStart) - computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration)) + computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration)) m := &module{module: mod} c.modules.add(id, m) @@ -201,18 +204,26 @@ func (c *Compute) Close() error { return nil } -func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { +func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { - if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil { - return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err) + if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil { + return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err) } - if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil { - return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err) + if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionId); err != nil { + return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err) } + cma := c.emitter.With( + platform.KeyWorkflowID, req.Metadata.WorkflowId, + platform.KeyWorkflowName, req.Metadata.WorkflowName, + platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner, + platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId, + timestampKey, time.Now().UTC().Format(time.RFC3339Nano), + ) + messageID := strings.Join([]string{ - workflowID, - workflowExecutionID, + req.Metadata.WorkflowId, + req.Metadata.WorkflowExecutionId, ghcapabilities.MethodComputeAction, c.idGenerator(), }, "/") @@ -245,6 +256,16 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx if err != nil { return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err) } + + // Only log if the response is not in the 200 range + if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices { + msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode) + err = cma.Emit(ctx, msg) + if err != nil { + c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err) + } + } + return &response, nil } } diff --git a/core/capabilities/compute/monitoring.go b/core/capabilities/compute/monitoring.go new file mode 100644 index 00000000000..4b676c25f7d --- /dev/null +++ b/core/capabilities/compute/monitoring.go @@ -0,0 +1,3 @@ +package compute + +const timestampKey = "computeTimestamp" diff --git a/core/platform/monitoring.go b/core/platform/monitoring.go new file mode 100644 index 00000000000..30221db240c --- /dev/null +++ b/core/platform/monitoring.go @@ -0,0 +1,15 @@ +package platform + +// Observability keys +const ( + KeyCapabilityID = "capabilityID" + KeyTriggerID = "triggerID" + KeyWorkflowID = "workflowID" + KeyWorkflowExecutionID = "workflowExecutionID" + KeyWorkflowName = "workflowName" + KeyWorkflowOwner = "workflowOwner" + KeyStepID = "stepID" + KeyStepRef = "stepRef" +) + +var OrderedLabelKeys = []string{KeyStepRef, KeyStepID, KeyTriggerID, KeyCapabilityID, KeyWorkflowExecutionID, KeyWorkflowID} diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 7cba967115e..72aff3033d0 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/platform" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) @@ -39,7 +40,7 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil } // ServicesForSpec satisfies the job.Delegate interface. func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { - cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName) + cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, spec.WorkflowSpec.WorkflowID, platform.KeyWorkflowOwner, spec.WorkflowSpec.WorkflowOwner, platform.KeyWorkflowName, spec.WorkflowSpec.WorkflowName) sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx) if err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index c5890209498..8e2cb8e34cb 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -23,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/platform" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) @@ -167,9 +168,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { for _, t := range e.workflow.triggers { tg, err := e.registry.GetTrigger(ctx, t.ID) if err != nil { - log := e.logger.With(cIDKey, t.ID) + log := e.logger.With(platform.KeyCapabilityID, t.ID) log.Errorf("failed to get trigger capability: %s", err) - logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log) + logCustMsg(ctx, e.cma.With(platform.KeyCapabilityID, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log) // we don't immediately return here, since we want to retry all triggers // to notify the user of all errors at once. triggersInitialized = false @@ -179,7 +180,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { } if !triggersInitialized { return &workflowError{reason: "failed to resolve triggers", labels: map[string]string{ - wIDKey: e.workflow.id, + platform.KeyWorkflowID: e.workflow.id, }} } @@ -201,15 +202,15 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { if err != nil { logCustMsg( ctx, - e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref), + e.cma.With(platform.KeyWorkflowID, e.workflow.id, platform.KeyStepID, s.ID, platform.KeyStepRef, s.Ref), fmt.Sprintf("failed to initialize capability for step: %s", err), e.logger, ) return &workflowError{err: err, reason: "failed to initialize capability for step", labels: map[string]string{ - wIDKey: e.workflow.id, - sIDKey: s.ID, - sRKey: s.Ref, + platform.KeyWorkflowID: e.workflow.id, + platform.KeyStepID: s.ID, + platform.KeyStepRef: s.Ref, }} } @@ -231,8 +232,8 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { } return &workflowError{reason: reason, err: err, labels: map[string]string{ - wIDKey: e.workflow.id, - sIDKey: step.ID, + platform.KeyWorkflowID: e.workflow.id, + platform.KeyStepID: step.ID, }} } @@ -318,7 +319,7 @@ func (e *Engine) init(ctx context.Context) { if err != nil { return &workflowError{err: err, reason: "failed to resolve workflow capabilities", labels: map[string]string{ - wIDKey: e.workflow.id, + platform.KeyWorkflowID: e.workflow.id, }} } return nil @@ -341,9 +342,9 @@ func (e *Engine) init(ctx context.Context) { for idx, t := range e.workflow.triggers { terr := e.registerTrigger(ctx, t, idx) if terr != nil { - log := e.logger.With(cIDKey, t.ID) + log := e.logger.With(platform.KeyCapabilityID, t.ID) log.Errorf("failed to register trigger: %s", terr) - logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log) + logCustMsg(ctx, e.cma.With(platform.KeyCapabilityID, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log) } } @@ -451,9 +452,9 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig // and triggerID might be "wf_123_trigger_0" return &workflowError{err: err, reason: fmt.Sprintf("failed to register trigger: %+v", triggerRegRequest), labels: map[string]string{ - wIDKey: e.workflow.id, - cIDKey: t.ID, - tIDKey: triggerID, + platform.KeyWorkflowID: e.workflow.id, + platform.KeyCapabilityID: t.ID, + platform.KeyTriggerID: triggerID, }} } @@ -491,7 +492,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig // `executionState`. func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) { defer e.wg.Done() - lggr := e.logger.With(eIDKey, executionID) + lggr := e.logger.With(platform.KeyWorkflowExecutionID, executionID) e.logger.Debugf("running stepUpdateLoop for execution %s", executionID) for { select { @@ -505,11 +506,11 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd } // Executed synchronously to ensure we correctly schedule subsequent tasks. e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID), - eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) + platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref) err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt) if err != nil { e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err), - eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) + platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref) } } } @@ -532,7 +533,7 @@ func generateExecutionID(workflowID, eventID string) (string, error) { // startExecution kicks off a new workflow execution when a trigger event is received. func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error { - lggr := e.logger.With("event", event, eIDKey, executionID) + lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID) lggr.Debug("executing on a trigger event") ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ @@ -584,8 +585,8 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event * } func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error { - l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) - cma := e.cma.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) + l := e.logger.With(platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref) + cma := e.cma.With(platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref) // If we've been executing for too long, let's time the workflow step out and continue. if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration { @@ -658,7 +659,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) { // If all dependencies are completed, enqueue the step. if !waitingOnDependencies { - e.logger.With(sRKey, step.Ref, eIDKey, state.ExecutionID, "state", copyState(state)). + e.logger.With(platform.KeyStepRef, step.Ref, platform.KeyWorkflowExecutionID, state.ExecutionID, "state", copyState(state)). Debug("step request enqueued") e.pendingStepRequests <- stepRequest{ state: copyState(state), @@ -668,7 +669,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) { } func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error { - e.logger.With(eIDKey, executionID, "status", status).Info("finishing execution") + e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status).Info("finishing execution") metrics := e.metrics.with("status", status) err := e.executionStates.UpdateStatus(ctx, executionID, status) if err != nil { @@ -713,23 +714,23 @@ func (e *Engine) worker(ctx context.Context) { te := resp.Event if te.ID == "" { - e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing") + e.logger.With(platform.KeyTriggerID, te.TriggerType).Error("trigger event ID is empty; not executing") continue } executionID, err := generateExecutionID(e.workflow.id, te.ID) if err != nil { - e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err) + e.logger.With(platform.KeyTriggerID, te.ID).Errorf("could not generate execution ID: %v", err) continue } - cma := e.cma.With(eIDKey, executionID) + cma := e.cma.With(platform.KeyWorkflowExecutionID, executionID) err = e.startExecution(ctx, executionID, resp.Event.Outputs) if err != nil { - e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err) + e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err) logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger) } else { - e.logger.With(eIDKey, executionID).Debug("execution started") + e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started") logCustMsg(ctx, cma, "execution started", e.logger) } case <-ctx.Done(): @@ -741,8 +742,8 @@ func (e *Engine) worker(ctx context.Context) { func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { // Instantiate a child logger; in addition to the WorkflowID field the workflow // logger will already have, this adds the `stepRef` and `executionID` - l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID) - cma := e.cma.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID) + l := e.logger.With(platform.KeyStepRef, msg.stepRef, platform.KeyWorkflowExecutionID, msg.state.ExecutionID) + cma := e.cma.With(platform.KeyStepRef, msg.stepRef, platform.KeyWorkflowExecutionID, msg.state.ExecutionID) l.Debug("executing on a step event") stepState := &store.WorkflowExecutionStep{ @@ -1104,9 +1105,9 @@ func (e *Engine) Close() error { return &workflowError{err: innerErr, reason: fmt.Sprintf("failed to unregister capability from workflow: %+v", reg), labels: map[string]string{ - wIDKey: e.workflow.id, - sIDKey: s.ID, - sRKey: s.Ref, + platform.KeyWorkflowID: e.workflow.id, + platform.KeyStepID: s.ID, + platform.KeyStepRef: s.Ref, }} } @@ -1157,7 +1158,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { if cfg.Store == nil { return nil, &workflowError{reason: "store is nil", labels: map[string]string{ - wIDKey: cfg.WorkflowID, + platform.KeyWorkflowID: cfg.WorkflowID, }, } } @@ -1206,7 +1207,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { // - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist) // - etc. - cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName) + cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName) workflow, err := Parse(cfg.Workflow) if err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr) @@ -1220,7 +1221,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { engine = &Engine{ cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)}, + metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, workflow.name)}, registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, @@ -1268,7 +1269,7 @@ func (e *workflowError) Error() string { } // prefix the error with the labels - for _, label := range orderedLabelKeys { + for _, label := range platform.OrderedLabelKeys { // This will silently ignore any labels that are not present in the map // are we ok with this? if value, ok := e.labels[label]; ok { diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 7837e205d2c..5e87d4f7603 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -20,7 +20,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/platform" gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" @@ -978,26 +980,26 @@ func TestEngine_Error(t *testing.T) { }{ { name: "Error with error and reason", - labels: map[string]string{wIDKey: "my-workflow-id"}, + labels: map[string]string{platform.KeyWorkflowID: "my-workflow-id"}, err: err, reason: "some reason", want: "workflowID my-workflow-id: some reason: some error", }, { name: "Error with error and no reason", - labels: map[string]string{eIDKey: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751"}, + labels: map[string]string{platform.KeyWorkflowExecutionID: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751"}, err: err, want: "workflowExecutionID dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751: some error", }, { name: "Error with no error and reason", - labels: map[string]string{cIDKey: "streams-trigger:network_eth@1.0.0"}, + labels: map[string]string{platform.KeyCapabilityID: "streams-trigger:network_eth@1.0.0"}, reason: "some reason", want: "capabilityID streams-trigger:network_eth@1.0.0: some reason", }, { name: "Error with no error and no reason", - labels: map[string]string{tIDKey: "wf_123_trigger_456"}, + labels: map[string]string{platform.KeyTriggerID: "wf_123_trigger_456"}, want: "triggerID wf_123_trigger_456: ", }, { @@ -1010,9 +1012,9 @@ func TestEngine_Error(t *testing.T) { { name: "Multiple labels", labels: map[string]string{ - wIDKey: "my-workflow-id", - eIDKey: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751", - cIDKey: "streams-trigger:network_eth@1.0.0", + platform.KeyWorkflowID: "my-workflow-id", + platform.KeyWorkflowExecutionID: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751", + platform.KeyCapabilityID: "streams-trigger:network_eth@1.0.0", }, err: err, reason: "some reason", diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index e2cb4c7259e..d498ff354c9 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -92,17 +92,3 @@ func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Cont otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } - -// Observability keys -const ( - cIDKey = "capabilityID" - tIDKey = "triggerID" - wIDKey = "workflowID" - eIDKey = "workflowExecutionID" - wnKey = "workflowName" - woIDKey = "workflowOwner" - sIDKey = "stepID" - sRKey = "stepRef" -) - -var orderedLabelKeys = []string{sRKey, sIDKey, tIDKey, cIDKey, eIDKey, wIDKey}