Skip to content

Commit

Permalink
refac: Rename workflow back to workflowFetcher
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Dec 6, 2024
1 parent 696a8a5 commit d6e4d3f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
span.Annotate("include_logs", req.IncludeLogs)
span.Annotate("shards", req.Shards)

w := &workflow{
w := &workflowFetcher{
ts: s.ts,
tmc: s.tmc,
parser: s.SQLParser(),
Expand Down
48 changes: 24 additions & 24 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ import (
vttimepb "vitess.io/vitess/go/vt/proto/vttime"
)

// workflow is responsible for fetching and retrieving information
// workflowFetcher is responsible for fetching and retrieving information
// about VReplication workflows.
type workflow struct {
type workflowFetcher struct {
ts *topo.Server
tmc tmclient.TabletManagerClient

Expand Down Expand Up @@ -97,7 +97,7 @@ ORDER BY
id ASC
`)

func (w *workflow) fetchWorkflowsByShard(
func (wf *workflowFetcher) fetchWorkflowsByShard(
ctx context.Context,
req *vtctldatapb.GetWorkflowsRequest,
) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
Expand All @@ -111,15 +111,15 @@ func (w *workflow) fetchWorkflowsByShard(

m := sync.Mutex{}

shards, err := common.GetShards(ctx, w.ts, req.Keyspace, req.Shards)
shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards)
if err != nil {
return nil, err
}

results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards))

err = w.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error {
primary, err := w.ts.GetTablet(ctx, si.PrimaryAlias)
err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error {
primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return err
}
Expand All @@ -128,7 +128,7 @@ func (w *workflow) fetchWorkflowsByShard(
}
// Clone the request so that we can set the correct DB name for tablet.
req := readReq.CloneVT()
wres, err := w.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req)
wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req)
if err != nil {
return err
}
Expand All @@ -144,7 +144,7 @@ func (w *workflow) fetchWorkflowsByShard(
return results, nil
}

func (w *workflow) fetchCopyStatesByShardStream(
func (wf *workflowFetcher) fetchCopyStatesByShardStream(
ctx context.Context,
workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse,
) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) {
Expand All @@ -153,13 +153,13 @@ func (w *workflow) fetchCopyStatesByShardStream(
copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard))

fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error {
span, ctx := trace.NewSpan(ctx, "workflow.workflow.fetchCopyStates")
span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates")
defer span.Finish()

span.Annotate("shard", tablet.Shard)
span.Annotate("tablet_alias", tablet.AliasString())

copyStates, err := w.getWorkflowCopyStates(ctx, tablet, streamIds)
copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds)
if err != nil {
return err
}
Expand Down Expand Up @@ -202,8 +202,8 @@ func (w *workflow) fetchCopyStatesByShardStream(
return copyStatesByShardStreamId, nil
}

func (w *workflow) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
span, ctx := trace.NewSpan(ctx, "workflow.workflow.getWorkflowCopyStates")
func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates")
defer span.Finish()

span.Annotate("keyspace", tablet.Keyspace)
Expand All @@ -220,7 +220,7 @@ func (w *workflow) getWorkflowCopyStates(ctx context.Context, tablet *topo.Table
if err != nil {
return nil, err
}
qr, err := w.tmc.VReplicationExec(ctx, tablet.Tablet, query)
qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query)
if err != nil {
return nil, err
}
Expand All @@ -247,7 +247,7 @@ func (w *workflow) getWorkflowCopyStates(ctx context.Context, tablet *topo.Table
return copyStates, nil
}

func (w *workflow) buildWorkflows(
func (wf *workflowFetcher) buildWorkflows(
ctx context.Context,
results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse,
copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState,
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *workflow) buildWorkflows(
}

metadata := workflowMetadataMap[workflowName]
err := w.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace)
err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (w *workflow) buildWorkflows(
fetchLogsWG.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer fetchLogsWG.Done()
w.fetchStreamLogs(ctx, req.Keyspace, workflow)
wf.fetchStreamLogs(ctx, req.Keyspace, workflow)
}(ctx, workflow)
}

Expand All @@ -324,7 +324,7 @@ func (w *workflow) buildWorkflows(
return maps.Values(workflowsMap), nil
}

func (w *workflow) scanWorkflow(
func (wf *workflowFetcher) scanWorkflow(
ctx context.Context,
workflow *vtctldatapb.Workflow,
res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse,
Expand All @@ -339,7 +339,7 @@ func (w *workflow) scanWorkflow(
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

si, err := w.ts.GetShard(ctx, keyspace, tablet.Shard)
si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard)
if err != nil {
return err
}
Expand Down Expand Up @@ -496,8 +496,8 @@ func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMe
workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag)
}

func (w *workflow) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) {
span, ctx := trace.NewSpan(ctx, "workflow.workflow.fetchStreamLogs")
func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) {
span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs")
defer span.Finish()

span.Annotate("keyspace", keyspace)
Expand All @@ -519,7 +519,7 @@ func (w *workflow) fetchStreamLogs(ctx context.Context, keyspace string, workflo
return
}

vx := vexec.NewVExec(keyspace, workflow.Name, w.ts, w.tmc, w.parser)
vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser)
results, err := vx.QueryContext(ctx, query)
if err != nil {
// Note that we do not return here. If there are any query results
Expand Down Expand Up @@ -617,7 +617,7 @@ func (w *workflow) fetchStreamLogs(ctx context.Context, keyspace string, workflo
}

if stream.Id > streamLog.StreamId {
w.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
// This can happen on manual/failed workflow cleanup so move to the next log.
break
}
Expand All @@ -630,7 +630,7 @@ func (w *workflow) fetchStreamLogs(ctx context.Context, keyspace string, workflo
}
}

func (w *workflow) forAllShards(
func (wf *workflowFetcher) forAllShards(
ctx context.Context,
keyspace string,
shards []string,
Expand All @@ -639,7 +639,7 @@ func (w *workflow) forAllShards(
eg, egCtx := errgroup.WithContext(ctx)
for _, shard := range shards {
eg.Go(func() error {
si, err := w.ts.GetShard(ctx, keyspace, shard)
si, err := wf.ts.GetShard(ctx, keyspace, shard)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtctl/workflow/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestGetWorkflowCopyStates(t *testing.T) {
},
}, sourceShards, targetShards)

w := workflow{
wf := workflowFetcher{
ts: te.ws.ts,
tmc: te.tmc,
}
Expand All @@ -133,7 +133,7 @@ func TestGetWorkflowCopyStates(t *testing.T) {
"1|table1|2", "1|table2|1",
))

copyStates, err := w.getWorkflowCopyStates(ctx, &topo.TabletInfo{
copyStates, err := wf.getWorkflowCopyStates(ctx, &topo.TabletInfo{
Tablet: tablet,
}, []int32{1})
assert.NoError(t, err)
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestFetchCopyStatesByShardStream(t *testing.T) {
},
}, sourceShards, targetShards)

w := workflow{
wf := workflowFetcher{
ts: te.ws.ts,
tmc: te.tmc,
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestFetchCopyStatesByShardStream(t *testing.T) {
},
},
}
copyStatesByStreamId, err := w.fetchCopyStatesByShardStream(ctx, readVReplicationResponse)
copyStatesByStreamId, err := wf.fetchCopyStatesByShardStream(ctx, readVReplicationResponse)
assert.NoError(t, err)

copyStates1 := copyStatesByStreamId["-80/1"]
Expand Down

0 comments on commit d6e4d3f

Please sign in to comment.