Skip to content

Commit

Permalink
fix: uncaught rollback commit on replay (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Oct 17, 2023
1 parent dbe4588 commit a71e0a9
Showing 1 changed file with 16 additions and 22 deletions.
38 changes: 16 additions & 22 deletions internal/store/postgres/scheduler/replay_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,25 @@ func (r ReplayRepository) RegisterReplay(ctx context.Context, replay *scheduler.
if err != nil {
return uuid.Nil, err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
} else {
tx.Commit(ctx)
}
}()

if err := r.insertReplay(ctx, tx, replay); err != nil {
tx.Rollback(ctx)
return uuid.Nil, err
}

storedReplay, err := r.getReplayRequest(ctx, tx, replay)
if err != nil {
tx.Rollback(ctx)
return uuid.Nil, err
}

// TODO: consider to store message of each run
if err := r.insertReplayRuns(ctx, tx, storedReplay.ID, runs); err != nil {
tx.Rollback(ctx)
return uuid.Nil, err
}

tx.Commit(ctx)
return storedReplay.ID, nil
}

Expand All @@ -153,31 +150,29 @@ func (r ReplayRepository) GetReplayToExecute(ctx context.Context) (*scheduler.Re
if err != nil {
return nil, err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
} else {
tx.Commit(ctx)
}
}()

replayRuns, err := r.getExecutableReplayRuns(ctx, tx)
if err != nil {
tx.Rollback(ctx)
return nil, err
}
if replayRuns == nil {
tx.Rollback(ctx)
return nil, errors.NotFound(scheduler.EntityJobRun, "no executable replay request found")
}

storedReplay, err := toReplay(replayRuns)
if err != nil {
tx.Rollback(ctx)
return nil, err
}

// TODO: Avoid having In Progress, but instead use row lock (for update)
if _, err := tx.Exec(ctx, updateReplayRequest, scheduler.ReplayStateInProgress, "", storedReplay.Replay.ID()); err != nil {
tx.Rollback(ctx)
return nil, errors.Wrap(scheduler.EntityJobRun, "unable to update replay", err)
}
tx.Commit(ctx)
return storedReplay, nil
}

Expand Down Expand Up @@ -352,19 +347,18 @@ func (r ReplayRepository) updateReplayRuns(ctx context.Context, id uuid.UUID, ru
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
} else {
tx.Commit(ctx)
}
}()

deleteRuns := `DELETE FROM replay_run WHERE replay_id = $1`
if _, err := tx.Exec(ctx, deleteRuns, id); err != nil {
tx.Rollback(ctx)
return errors.Wrap(scheduler.EntityJobRun, "unable to delete runs of replay", err)
}
return r.insertReplayRuns(ctx, tx, id, runs)
if err := r.insertReplayRuns(ctx, tx, id, runs); err != nil {
tx.Rollback(ctx)
return errors.Wrap(scheduler.EntityJobRun, "unable to insert runs of replay", err)
}
tx.Commit(ctx)
return nil
}

func (ReplayRepository) insertReplay(ctx context.Context, tx pgx.Tx, replay *scheduler.Replay) error {
Expand Down

0 comments on commit a71e0a9

Please sign in to comment.