Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Dye <[email protected]>
  • Loading branch information
andrewwdye committed Sep 26, 2023
1 parent a0b97d5 commit dfa2db6
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 10 deletions.
19 changes: 13 additions & 6 deletions pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
CreatedAt: input.Request.Event.OccurredAt,
Logs: input.Request.Event.Logs,
CustomInfo: input.Request.Event.CustomInfo,
Reason: input.Request.Event.Reason,
TaskType: input.Request.Event.TaskType,
Metadata: metadata,
EventVersion: input.Request.Event.EventVersion,
}

if len(input.Request.Event.Reason) > 0 {
if len(input.Request.Event.Reasons) > 0 {
for _, reason := range input.Request.Event.Reasons {
closure.Reasons = append(closure.Reasons, &admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason
} else if len(input.Request.Event.Reason) > 0 {
closure.Reasons = []*admin.Reason{
&admin.Reason{
{
OccurredAt: input.Request.Event.OccurredAt,
Message: input.Request.Event.Reason,
},
}
closure.Reason = input.Request.Event.Reason
}

eventPhase := input.Request.Event.Phase
Expand Down Expand Up @@ -388,15 +396,14 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append( // TODO: this is where to unpack batch
taskExecutionClosure.Reasons = append(
taskExecutionClosure.Reasons,
&admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
// TODO: avoid dupes?
// taskExecutionClosure.Reason = request.Event.Reason
taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason
} else if len(request.Event.Reason) > 0 {
if taskExecutionClosure.Reason != request.Event.Reason {
// by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where
Expand Down
295 changes: 291 additions & 4 deletions pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) {
UpdatedAt: taskEventOccurredAtProto,
Reason: "Task was scheduled",
Reasons: []*admin.Reason{
&admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
Expand Down Expand Up @@ -406,6 +406,93 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) {
}, taskExecutionModel)
}

func TestCreateTaskExecutionModelSingleEvents(t *testing.T) {
taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{
Request: &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_RUNNING,
PhaseVersion: uint32(2),
RetryAttempt: 1,
InputValue: &event.TaskExecutionEvent_InputUri{
InputUri: testInputURI,
},
OutputResult: &event.TaskExecutionEvent_OutputUri{
OutputUri: "output uri",
},
OccurredAt: taskEventOccurredAtProto,
Reason: "Task event 1",
},
},
})
assert.Nil(t, err)

expectedClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Reason: "Task event 1",
Reasons: []*admin.Reason{
{OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"},
},
}
expectedClosureBytes, err := proto.Marshal(expectedClosure)
assert.Nil(t, err)
assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure)
}

func TestCreateTaskExecutionModelBatchedEvents(t *testing.T) {
secondTaskEventOccurredAt := taskEventOccurredAt.Add(time.Second)
secondTaskEventOccurredAtProto, _ := ptypes.TimestampProto(secondTaskEventOccurredAt)
taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{
Request: &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_RUNNING,
PhaseVersion: uint32(2),
RetryAttempt: 1,
InputValue: &event.TaskExecutionEvent_InputUri{
InputUri: testInputURI,
},
OutputResult: &event.TaskExecutionEvent_OutputUri{
OutputUri: "output uri",
},
OccurredAt: taskEventOccurredAtProto,
Reason: "Task event 1", // Here for backwards compatibility
Reasons: []*event.BatchedReason{
{
OccurredAt: taskEventOccurredAtProto,
Reason: "Task event 1",
},
{
OccurredAt: secondTaskEventOccurredAtProto,
Reason: "Task event 2",
},
},
},
},
})
assert.Nil(t, err)

expectedClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Reason: "Task event 2",
Reasons: []*admin.Reason{
{OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"},
{OccurredAt: secondTaskEventOccurredAtProto, Message: "Task event 2"},
},
}
expectedClosureBytes, err := proto.Marshal(expectedClosure)
assert.Nil(t, err)
assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure)
}

func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
Expand All @@ -425,7 +512,7 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
}),
Reason: "Task was scheduled",
Reasons: []*admin.Reason{
&admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
Expand Down Expand Up @@ -526,11 +613,11 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
}),
Reason: "task failed",
Reasons: []*admin.Reason{
&admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
&admin.Reason{
{
OccurredAt: occuredAtProto,
Message: "task failed",
},
Expand Down Expand Up @@ -569,6 +656,206 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {

}

func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Reason: "Task was scheduled",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

taskEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_RUNNING,
RetryAttempt: 1,
InputValue: &event.TaskExecutionEvent_InputUri{
InputUri: testInputURI,
},
OutputResult: &event.TaskExecutionEvent_OutputUri{
OutputUri: "output uri",
},
OccurredAt: occuredAtProto,
Reason: "update 1",
},
}

err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

expectedClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
UpdatedAt: occuredAtProto,
CreatedAt: taskEventOccurredAtProto,
Reason: "update 1",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
{
OccurredAt: occuredAtProto,
Message: "update 1",
},
},
}

expectedClosureBytes, err := proto.Marshal(expectedClosure)
assert.Nil(t, err)
assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure)
}

func TestUpdateTaskExecutionModelBatchedEvents(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Reason: "Task was scheduled",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)
secondOccuredAt := taskEventOccurredAt.Add(time.Minute * 2)
secondOccuredAtProto, err := ptypes.TimestampProto(secondOccuredAt)

Check failure on line 799 in pkg/repositories/transformers/task_execution_test.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

ineffectual assignment to err (ineffassign)

taskEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_RUNNING,
RetryAttempt: 1,
InputValue: &event.TaskExecutionEvent_InputUri{
InputUri: testInputURI,
},
OutputResult: &event.TaskExecutionEvent_OutputUri{
OutputUri: "output uri",
},
OccurredAt: occuredAtProto,
Reason: "update 1", // Here for backwards compatibility
Reasons: []*event.BatchedReason{
&event.BatchedReason{
OccurredAt: occuredAtProto,
Reason: "update 1",
},
&event.BatchedReason{
OccurredAt: secondOccuredAtProto,
Reason: "update 2",
},
},
},
}

err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

expectedClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
UpdatedAt: occuredAtProto,
CreatedAt: taskEventOccurredAtProto,
Reason: "update 2",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
{
OccurredAt: occuredAtProto,
Message: "update 1",
},
{
OccurredAt: secondOccuredAtProto,
Message: "update 2",
},
},
}

expectedClosureBytes, err := proto.Marshal(expectedClosure)
assert.Nil(t, err)
assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure)
}

func TestFromTaskExecutionModel(t *testing.T) {
taskClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
Expand Down

0 comments on commit dfa2db6

Please sign in to comment.