Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of out of order messages from direct connected hosts #305

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion internal/response-consumer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (this *handler) onMessage(ctx context.Context, msg *k.Message) {
Events: eventsSerialized,
}

if updateResult := baseQuery.Select("status", "events").Updates(toUpdate); updateResult.Error != nil {
// Only update if the run is not marked as complete
updateResult := baseQuery.Where("status not in ?", []string{db.RunStatusSuccess, db.RunStatusFailure}).Select("status", "events").Updates(toUpdate)
if updateResult.Error != nil {
utils.GetLogFromContext(ctx).Errorw("Error updating run in db", "error", updateResult.Error)
return updateResult.Error
} else {
Expand Down Expand Up @@ -214,8 +216,17 @@ func satUpdateRecord(ctx context.Context, tx *gorm.DB, responseFull bool, toUpda
}

func createRecord(ctx context.Context, tx *gorm.DB, toCreate []db.RunHost) error {

successOrFailure := clause.OrConditions{Exprs: []clause.Expression{
clause.Eq{Column: "run_hosts.status", Value: db.RunStatusSuccess},
clause.Eq{Column: "run_hosts.status", Value: db.RunStatusFailure},
}}

notMarkedAsComplete := clause.Where{Exprs: []clause.Expression{clause.Not(successOrFailure)}}

createResult := tx.Model(db.RunHost{}).
Clauses(clause.OnConflict{
Where: notMarkedAsComplete,
Columns: []clause.Column{{Name: "run_id"}, {Name: "host"}},
DoUpdates: clause.AssignmentColumns([]string{"status", "log"}),
Comment on lines 218 to 231
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dehort Looking good! Any idea if this notMarkedAsComplete check might be redundant, since we are checking for it through the where clause in the main function?

Also, any idea if we need to do the same for satellite requests? I think satellite payloads have a sat_sequence number, which kind of protects us against out of order messages, but might be good to double check. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the notMarkedAsComplete where clause is needed here because we are only passing the transaction object to the createRecord() method. We don't actually pass the query object that we built in the calling function.

Yeah, the sat_sequence field protect us from out of order messages at the host level. But... I think the change in this PR protects us from marking a top level run as complete or failed and then marking it as running if we received a satellite message out of order. I added another test for this.

Thank you, @tahmidefaz !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thank you!

}).
Expand Down
101 changes: 101 additions & 0 deletions internal/response-consumer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,74 @@ var _ = Describe("handler", func() {
checkHost(data.ID, "success", nil, "", nil)
})

It("successful runner event - ignore out-of-order updates", func() {
var data = test.NewRun(orgId())
Expect(db().Create(&data).Error).ToNot(HaveOccurred())

events := createRunnerEvents(
messageModel.EventExecutorOnStart,
"playbook_on_start",
"playbook_on_play_start",
"playbook_on_task_start",
"runner_on_start",
"runner_on_ok",
"playbook_on_stats",
)

// process the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(events, data.CorrelationID))

// verify that the status is marked as complete/success
run := fetchRun(data.ID)
Expect(run.Status).To(Equal("success"))
checkHost(data.ID, "success", nil, "", nil)

// remove the last element from the events slice
incompleteListOfEvents := (*events)[:len(*events)-1]

// process the incomplete set of events after processing the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(&incompleteListOfEvents, data.CorrelationID))

// verify that processing a incomplete set of events does not overwrite the complete set of events
run = fetchRun(data.ID)
Expect(run.Status).To(Equal("success"))
checkHost(data.ID, "success", nil, "", nil)
})

It("failed runner event - ignore out-of-order updates", func() {
var data = test.NewRun(orgId())
Expect(db().Create(&data).Error).ToNot(HaveOccurred())

events := createRunnerEvents(
messageModel.EventExecutorOnStart,
"playbook_on_start",
"playbook_on_play_start",
"playbook_on_task_start",
"runner_on_start",
"runner_on_failed",
"playbook_on_stats",
)

// process the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(events, data.CorrelationID))

// verify that the status is marked as complete/failure
run := fetchRun(data.ID)
Expect(run.Status).To(Equal("failure"))
checkHost(data.ID, "failure", nil, "", nil)

// remove the last element from the events slice
incompleteListOfEvents := (*events)[:len(*events)-1]

// process the incomplete set of events after processing the complete set of events
instance.onMessage(test.TestContext(), newRunnerResponseMessage(&incompleteListOfEvents, data.CorrelationID))

// verify that processing a incomplete set of events does not overwrite the complete set of events
run = fetchRun(data.ID)
Expect(run.Status).To(Equal("failure"))
checkHost(data.ID, "failure", nil, "", nil)
})

It("updates the run status based on failure runner events", func() {
var data = test.NewRun(orgId())
Expect(db().Create(&data).Error).ToNot(HaveOccurred())
Expand Down Expand Up @@ -817,6 +885,39 @@ var _ = Describe("handler", func() {
checkHost(data.ID, "success", utils.IntRef(2), "\\n\\u2026\\nsecond console log\nthird console log", &inventoryId)
})

It("success status not overridden by out-of-order event", func() {
var data = test.NewRun(orgId())
data.ResponseFull = false

Expect(db().Create(&data).Error).ToNot(HaveOccurred())

inventoryId := uuid.New()
var hostData = test.NewRunHost(data.ID, "running", &inventoryId)
inventoryIdString := inventoryId.String()

Expect(db().Create(&hostData).Error).ToNot(HaveOccurred())

events := buildSatEvents(
data.CorrelationID,
satPlaybookRunUpdateEvent(1, inventoryIdString, "second console log"),
satPlaybookRunFinishedEvent(inventoryIdString, "success"),
satPlaybookRunCompletedEvent("success"),
)

instance.onMessage(test.TestContext(), newSatResponseMessage(events, data.CorrelationID))

events = buildSatEvents(
data.CorrelationID,
satPlaybookRunUpdateEvent(0, inventoryIdString, "first console log\n"),
)

instance.onMessage(test.TestContext(), newSatResponseMessage(events, data.CorrelationID))

run := fetchRun(data.ID)

Expect(run.Status).To(Equal("success"))
})

It("failed status not overridden by out-of-order event", func() {
var data = test.NewRun(orgId())
data.ResponseFull = false
Expand Down
Loading