Skip to content

Commit

Permalink
SyncFlow: top level heartbeat (#2401)
Browse files Browse the repository at this point in the history
was seeing heartbeat timeouts
  • Loading branch information
serprex authored Dec 30, 2024
1 parent 50e6a19 commit 60e80b8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 104 deletions.
135 changes: 68 additions & 67 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ type NormalizeBatchRequest struct {
BatchID int64
}

type CdcState struct {
connector connectors.CDCPullConnectorCore
syncDone chan struct{}
normalize chan NormalizeBatchRequest
errGroup *errgroup.Group
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down Expand Up @@ -249,40 +242,67 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) maintainPull(
func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
) (CdcState, context.Context, error) {
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
logger := activity.GetLogger(ctx)

var currentSyncFlowNum atomic.Int32
var totalRecordsSynced atomic.Int64
var normalizingBatchID atomic.Int64
var normalizeWaiting atomic.Bool
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]
syncState.Store(shared.Ptr("setup"))

shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
nBatchID := normalizingBatchID.Load()
var nWaiting string
if normalizeWaiting.Load() {
nWaiting = " (W)"
}
return fmt.Sprintf(
"currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d (%s), normalizingBatchID:%d%s",
currentSyncFlowNum.Load(), totalRecordsSynced.Load(),
sBatchID, *syncState.Load(), nBatchID, nWaiting,
)
})
defer shutdown()

srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return CdcState{}, nil, err
return err
}

if err := srcConn.SetupReplConn(ctx); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, nil, err
return err
}

normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, nil, err
return err
}

// syncDone will be closed by SyncFlow,
// whereas normalizeDone will be closed by normalizing goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest, normalizeBufferSize)
normRequests := make(chan NormalizeBatchRequest, normalizeBufferSize)

group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
// returning error signals sync to stop, normalize can recover connections without interrupting sync, so never return error
a.normalizeLoop(groupCtx, config, syncDone, normalize)
a.normalizeLoop(groupCtx, logger, config, syncDone, normRequests, &normalizingBatchID, &normalizeWaiting)
return nil
})
group.Go(func() error {
Expand All @@ -294,41 +314,17 @@ func (a *FlowableActivity) maintainPull(
return nil
})

return CdcState{
connector: srcConn,
syncDone: syncDone,
normalize: normalize,
errGroup: group,
}, groupCtx, nil
}

func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

cdcState, groupCtx, err := a.maintainPull(ctx, config)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
return err
}

currentSyncFlowNum := int32(0)
totalRecordsSynced := int64(0)

for groupCtx.Err() == nil {
currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum)))
logger.Info("executing sync flow", slog.Int64("count", int64(currentSyncFlowNum.Add(1))))

var numRecordsSynced int64
var syncResponse *model.SyncResponse
var syncErr error
if config.System == protos.TypeSystem_Q {
numRecordsSynced, syncErr = a.SyncRecords(groupCtx, config, options, cdcState)
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
normRequests, &syncingBatchID, &syncState)
} else {
numRecordsSynced, syncErr = a.SyncPg(groupCtx, config, options, cdcState)
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
normRequests, &syncingBatchID, &syncState)
}

if syncErr != nil {
Expand All @@ -337,21 +333,23 @@ func (a *FlowableActivity) SyncFlow(
break
}
logger.Error("failed to sync records", slog.Any("error", syncErr))
close(cdcState.syncDone)
return errors.Join(syncErr, cdcState.errGroup.Wait())
syncState.Store(shared.Ptr("cleanup"))
close(syncDone)
return errors.Join(syncErr, group.Wait())
} else {
totalRecordsSynced += numRecordsSynced
logger.Info("synced records",
slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced))
totalRecordsSynced.Add(syncResponse.NumRecordsSynced)
logger.Info("synced records", slog.Int64("numRecordsSynced", syncResponse.NumRecordsSynced),
slog.Int64("totalRecordsSynced", totalRecordsSynced.Load()))

if options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs {
if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs {
break
}
}
}

close(cdcState.syncDone)
waitErr := cdcState.errGroup.Wait()
syncState.Store(shared.Ptr("cleanup"))
close(syncDone)
waitErr := group.Wait()
if err := ctx.Err(); err != nil {
logger.Info("sync canceled", slog.Any("error", err))
return err
Expand All @@ -362,12 +360,15 @@ func (a *FlowableActivity) SyncFlow(
return nil
}

func (a *FlowableActivity) SyncRecords(
func (a *FlowableActivity) syncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
cdcState CdcState,
) (int64, error) {
srcConn connectors.CDCPullConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -398,23 +399,28 @@ func (a *FlowableActivity) SyncRecords(
return stream, nil
}
}
return syncCore(ctx, a, config, options, cdcState, adaptStream,
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, adaptStream,
connectors.CDCPullConnector.PullRecords,
connectors.CDCSyncConnector.SyncRecords)
}

func (a *FlowableActivity) SyncPg(
func (a *FlowableActivity) syncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
cdcState CdcState,
) (int64, error) {
return syncCore(ctx, a, config, options, cdcState, nil,
srcConn connectors.CDCPullPgConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) StartNormalize(
func (a *FlowableActivity) startNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
Expand All @@ -435,11 +441,6 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job"
})
defer shutdown()

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get table name schema mapping: %w", err)
Expand Down
Loading

0 comments on commit 60e80b8

Please sign in to comment.