Skip to content

Commit

Permalink
update checkruns with queue status for Terraform jobs where Apply ste…
Browse files Browse the repository at this point in the history
…p has waiting actions
  • Loading branch information
tlin4194 committed Oct 30, 2024
1 parent a4bc2ce commit 5926b53
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type queue interface {
CanPop() bool
Pop() (terraform.DeploymentInfo, error)
SetLockForMergedItems(ctx workflow.Context, state LockState)
GetOrderedMergedItems() []terraform.DeploymentInfo
GetQueuedRevisionsSummary() string
}

Expand Down Expand Up @@ -97,7 +98,7 @@ func NewWorker(
},
}

tfWorkflowRunner := terraform.NewWorkflowRunner(q, tfWorkflow, notifiers, additionalNotifiers...)
tfWorkflowRunner := terraform.NewWorkflowRunner(q, tfWorkflow, githubCheckRunCache, notifiers, additionalNotifiers...)
deployer := &Deployer{
Activities: a,
TerraformWorkflowRunner: tfWorkflowRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func (q *testQueue) Push(msg internalTerraform.DeploymentInfo) {
q.Queue.PushBack(msg)
}

func (q *testQueue) GetOrderedMergedItems() []internalTerraform.DeploymentInfo {
var result []internalTerraform.DeploymentInfo
for e := q.Queue.Front(); e != nil; e = e.Next() {
result = append(result, e.Value.(internalTerraform.DeploymentInfo))
}
return result
}

func (q *testQueue) SetLockForMergedItems(ctx workflow.Context, state queue.LockState) {
q.Lock = state
}
Expand Down
10 changes: 3 additions & 7 deletions server/neptune/workflows/internal/deploy/terraform/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,16 @@ type stateReceiver interface {
}

type deployQueue interface {
// IsEmpty() bool
// CanPop() bool
// Pop() (DeploymentInfo, error)
// Scan() []DeploymentInfo
// GetLockState() queue.LockState
// SetLockForMergedItems(ctx workflow.Context, state queue.LockState)
GetOrderedMergedItems() []DeploymentInfo
GetQueuedRevisionsSummary() string
}

func NewWorkflowRunner(queue deployQueue, w Workflow, internalNotifiers []WorkflowNotifier, additionalNotifiers ...plugins.TerraformWorkflowNotifier) *WorkflowRunner {
func NewWorkflowRunner(queue deployQueue, w Workflow, githubCheckRunCache CheckRunClient, internalNotifiers []WorkflowNotifier, additionalNotifiers ...plugins.TerraformWorkflowNotifier) *WorkflowRunner {
return &WorkflowRunner{
Workflow: w,
StateReceiver: &StateReceiver{
Queue: queue,
CheckRunCache: githubCheckRunCache,
InternalNotifiers: internalNotifiers,
AdditionalNotifiers: additionalNotifiers,
},
Expand Down
34 changes: 32 additions & 2 deletions server/neptune/workflows/internal/deploy/terraform/state.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package terraform

import (
"fmt"
"reflect"

"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/metrics"
"github.com/runatlantis/atlantis/server/neptune/workflows/activities/github"
"github.com/runatlantis/atlantis/server/neptune/workflows/internal/notifier"
"github.com/runatlantis/atlantis/server/neptune/workflows/internal/terraform/state"
"github.com/runatlantis/atlantis/server/neptune/workflows/plugins"
Expand All @@ -15,11 +17,16 @@ type WorkflowNotifier interface {
Notify(workflow.Context, notifier.Info, *state.Workflow) error
}

type CheckRunClient interface {
CreateOrUpdate(ctx workflow.Context, deploymentID string, request notifier.GithubCheckRunRequest) (int64, error)
}

type StateReceiver struct {

// We have separate classes of notifiers since we can be more flexible with our internal ones in terms of the data model
// What we support externally should be well thought out so for now this is kept to a minimum.
Queue deployQueue
CheckRunCache CheckRunClient
InternalNotifiers []WorkflowNotifier
AdditionalNotifiers []plugins.TerraformWorkflowNotifier
}
Expand All @@ -41,8 +48,31 @@ func (n *StateReceiver) Receive(ctx workflow.Context, c workflow.ReceiveChannel,
}
}

if workflowState.Apply.Status == state.WaitingJobStatus && reflect.ValueOf(workflowState.Apply.OnWaitingActions).IsZero() {
// lock the queue item
if workflowState.Apply.Status == state.WaitingJobStatus && !reflect.ValueOf(workflowState.Apply.OnWaitingActions).IsZero() {
// update queue with information about current deployment pending confirm/reject action
infos := n.Queue.GetOrderedMergedItems()

revisionsSummary := n.Queue.GetQueuedRevisionsSummary()
state := github.CheckRunQueued
revisionLink := github.BuildRevisionURLMarkdown(deploymentInfo.Repo.GetFullName(), deploymentInfo.Commit.Revision)
summary := fmt.Sprintf("This deploy is queued pending action on revision %s.\n%s", revisionLink, revisionsSummary)

for _, i := range infos {
request := notifier.GithubCheckRunRequest{
Title: notifier.BuildDeployCheckRunTitle(i.Root.Name),
Sha: i.Commit.Revision,
State: state,
Repo: i.Repo,
Summary: summary,
}

workflow.GetLogger(ctx).Debug(fmt.Sprintf("Updating action pending summary for deployment id: %s", i.ID.String()))
_, err := n.CheckRunCache.CreateOrUpdate(ctx, i.ID.String(), request)

if err != nil {
workflow.GetLogger(ctx).Debug(fmt.Sprintf("updating check run for revision %s", i.Commit.Revision), err)
}
}
}

for _, notifier := range n.InternalNotifiers {
Expand Down
70 changes: 69 additions & 1 deletion server/neptune/workflows/internal/deploy/terraform/state_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package terraform_test

import (
"github.com/runatlantis/atlantis/server/neptune/workflows/internal/notifier"
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/runatlantis/atlantis/server/neptune/workflows/internal/notifier"

"github.com/google/uuid"
"github.com/runatlantis/atlantis/server/neptune/workflows/activities/github"
"github.com/runatlantis/atlantis/server/neptune/workflows/activities/terraform"
Expand Down Expand Up @@ -49,13 +52,41 @@ func (n *testExternalNotifier) Notify(ctx workflow.Context, info plugins.Terrafo
return nil
}

type testCheckRunClient struct {
called bool
}

func (t *testCheckRunClient) CreateOrUpdate(ctx workflow.Context, deploymentID string, request notifier.GithubCheckRunRequest) (int64, error) {
t.called = true
return 1, nil
}

type testQueue struct {
Queue []internalTerraform.DeploymentInfo
}

func (q *testQueue) GetQueuedRevisionsSummary() string {
var revisions []string
for _, deploy := range q.Queue {
revisions = append(revisions, deploy.Commit.Revision)
}
return fmt.Sprintf("Revisions in queue: %s", strings.Join(revisions, ", "))
}

func (q *testQueue) GetOrderedMergedItems() []internalTerraform.DeploymentInfo {
return q.Queue
}

type stateReceiveRequest struct {
Queue *testQueue
CheckRunCache testCheckRunClient
State *state.Workflow
DeploymentInfo internalTerraform.DeploymentInfo
T *testing.T
}

type stateReceiveResponse struct {
CheckRunCacheCalled bool
NotifierCalled bool
ExternalNotifierCalled bool
}
Expand All @@ -79,6 +110,8 @@ func testStateReceiveWorkflow(ctx workflow.Context, r stateReceiveRequest) (stat
}

receiver := &internalTerraform.StateReceiver{
Queue: r.Queue,
CheckRunCache: &r.CheckRunCache,
InternalNotifiers: []internalTerraform.WorkflowNotifier{
notifier,
},
Expand All @@ -94,6 +127,7 @@ func testStateReceiveWorkflow(ctx workflow.Context, r stateReceiveRequest) (stat
receiver.Receive(ctx, ch, r.DeploymentInfo)

return stateReceiveResponse{
CheckRunCacheCalled: r.CheckRunCache.called,
NotifierCalled: notifier.called,
ExternalNotifierCalled: externalNotifier.called,
}, nil
Expand All @@ -117,16 +151,49 @@ func TestStateReceive(t *testing.T) {
},
}

queue := &testQueue{
Queue: []internalTerraform.DeploymentInfo{
{
CheckRunID: 0,
ID: uuid.New(),
Root: terraform.Root{Name: "root"},
Repo: github.Repo{Name: "hello"},
Commit: github.Commit{
Revision: "56789",
},
},
},
}

t.Run("calls notifiers with state", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()

env.ExecuteWorkflow(testStateReceiveWorkflow, stateReceiveRequest{
Queue: queue,
CheckRunCache: testCheckRunClient{},
State: &state.Workflow{
Plan: &state.Job{
Output: jobOutput,
Status: state.WaitingJobStatus,
},
Apply: &state.Job{
Output: jobOutput,
Status: state.WaitingJobStatus,
OnWaitingActions: state.JobActions{
Actions: []state.JobAction{
{
ID: state.ConfirmAction,
Info: "Confirm this plan to proceed to apply",
},
{
ID: state.RejectAction,
Info: "Reject this plan to prevent the apply",
},
},
Summary: "some reason",
},
},
},
DeploymentInfo: internalDeploymentInfo,
T: t,
Expand All @@ -136,6 +203,7 @@ func TestStateReceive(t *testing.T) {

var result stateReceiveResponse
err = env.GetWorkflowResult(&result)
assert.True(t, result.CheckRunCacheCalled)
assert.True(t, result.NotifierCalled)
assert.True(t, result.ExternalNotifierCalled)
assert.NoError(t, err)
Expand Down

0 comments on commit 5926b53

Please sign in to comment.