Skip to content

Commit

Permalink
Improve handling of out of order messages (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
dehort authored Oct 5, 2023
1 parent a9eb50d commit 6889b4f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 1 deletion.
13 changes: 12 additions & 1 deletion internal/response-consumer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (this *handler) onMessage(ctx context.Context, msg *k.Message) {
Events: eventsSerialized,
}

if updateResult := baseQuery.Select("status", "events").Updates(toUpdate); updateResult.Error != nil {
// Only update if the run is not marked as complete
updateResult := baseQuery.Where("status not in ?", []string{db.RunStatusSuccess, db.RunStatusFailure}).Select("status", "events").Updates(toUpdate)
if updateResult.Error != nil {
utils.GetLogFromContext(ctx).Errorw("Error updating run in db", "error", updateResult.Error)
return updateResult.Error
} else {
Expand Down Expand Up @@ -214,8 +216,17 @@ func satUpdateRecord(ctx context.Context, tx *gorm.DB, responseFull bool, toUpda
}

func createRecord(ctx context.Context, tx *gorm.DB, toCreate []db.RunHost) error {

successOrFailure := clause.OrConditions{Exprs: []clause.Expression{
clause.Eq{Column: "run_hosts.status", Value: db.RunStatusSuccess},
clause.Eq{Column: "run_hosts.status", Value: db.RunStatusFailure},
}}

notMarkedAsComplete := clause.Where{Exprs: []clause.Expression{clause.Not(successOrFailure)}}

createResult := tx.Model(db.RunHost{}).
Clauses(clause.OnConflict{
Where: notMarkedAsComplete,
Columns: []clause.Column{{Name: "run_id"}, {Name: "host"}},
DoUpdates: clause.AssignmentColumns([]string{"status", "log"}),
}).
Expand Down
101 changes: 101 additions & 0 deletions internal/response-consumer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,74 @@ var _ = Describe("handler", func() {
checkHost(data.ID, "success", nil, "", nil)
})

It("successful runner event - ignore out-of-order updates", func() {
var data = test.NewRun(orgId())
Expect(db().Create(&data).Error).ToNot(HaveOccurred())

events := createRunnerEvents(
messageModel.EventExecutorOnStart,
"playbook_on_start",
"playbook_on_play_start",
"playbook_on_task_start",
"runner_on_start",
"runner_on_ok",
"playbook_on_stats",
)

// process the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(events, data.CorrelationID))

// verify that the status is marked as complete/success
run := fetchRun(data.ID)
Expect(run.Status).To(Equal("success"))
checkHost(data.ID, "success", nil, "", nil)

// remove the last element from the events slice
incompleteListOfEvents := (*events)[:len(*events)-1]

// process the incomplete set of events after processing the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(&incompleteListOfEvents, data.CorrelationID))

// verify that processing a incomplete set of events does not overwrite the complete set of events
run = fetchRun(data.ID)
Expect(run.Status).To(Equal("success"))
checkHost(data.ID, "success", nil, "", nil)
})

It("failed runner event - ignore out-of-order updates", func() {
var data = test.NewRun(orgId())
Expect(db().Create(&data).Error).ToNot(HaveOccurred())

events := createRunnerEvents(
messageModel.EventExecutorOnStart,
"playbook_on_start",
"playbook_on_play_start",
"playbook_on_task_start",
"runner_on_start",
"runner_on_failed",
"playbook_on_stats",
)

// process the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(events, data.CorrelationID))

// verify that the status is marked as complete/failure
run := fetchRun(data.ID)
Expect(run.Status).To(Equal("failure"))
checkHost(data.ID, "failure", nil, "", nil)

// remove the last element from the events slice
incompleteListOfEvents := (*events)[:len(*events)-1]

// process the incomplete set of events after processing the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(&incompleteListOfEvents, data.CorrelationID))

// verify that processing a incomplete set of events does not overwrite the complete set of events
run = fetchRun(data.ID)
Expect(run.Status).To(Equal("failure"))
checkHost(data.ID, "failure", nil, "", nil)
})

It("updates the run status based on failure runner events", func() {
var data = test.NewRun(orgId())
Expect(db().Create(&data).Error).ToNot(HaveOccurred())
Expand Down Expand Up @@ -817,6 +885,39 @@ var _ = Describe("handler", func() {
checkHost(data.ID, "success", utils.IntRef(2), "\\n\\u2026\\nsecond console log\nthird console log", &inventoryId)
})

It("success status not overridden by out-of-order event", func() {
var data = test.NewRun(orgId())
data.ResponseFull = false

Expect(db().Create(&data).Error).ToNot(HaveOccurred())

inventoryId := uuid.New()
var hostData = test.NewRunHost(data.ID, "running", &inventoryId)
inventoryIdString := inventoryId.String()

Expect(db().Create(&hostData).Error).ToNot(HaveOccurred())

events := buildSatEvents(
data.CorrelationID,
satPlaybookRunUpdateEvent(1, inventoryIdString, "second console log"),
satPlaybookRunFinishedEvent(inventoryIdString, "success"),
satPlaybookRunCompletedEvent("success"),
)

instance.onMessage(test.TestContext(), newSatResponseMessage(events, data.CorrelationID))

events = buildSatEvents(
data.CorrelationID,
satPlaybookRunUpdateEvent(0, inventoryIdString, "first console log\n"),
)

instance.onMessage(test.TestContext(), newSatResponseMessage(events, data.CorrelationID))

run := fetchRun(data.ID)

Expect(run.Status).To(Equal("success"))
})

It("failed status not overridden by out-of-order event", func() {
var data = test.NewRun(orgId())
data.ResponseFull = false
Expand Down

0 comments on commit 6889b4f

Please sign in to comment.