Skip to content

Commit

Permalink
pcr: quantize PCR frontier timestamps to 5s
Browse files Browse the repository at this point in the history
Fewer distinct timestamps makes it easier to merge adjacent spans.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Mar 18, 2024
1 parent 0b6de9c commit 230cef6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
17 changes: 17 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ var cutoverSignalPollInterval = settings.RegisterDurationSetting(
settings.WithName("physical_replication.consumer.cutover_signal_poll_interval"),
)

var quantize = settings.RegisterDurationSettingWithExplicitUnit(
settings.SystemOnly,
"physical_replication.consumer.timestamp_granularity",
"the granularity at which replicated times are quantized to make tracking more efficient",
5*time.Second,
)

var streamIngestionResultTypes = []*types.T{
types.Bytes, // jobspb.ResolvedSpans
}
Expand Down Expand Up @@ -874,7 +881,17 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro

lowestTimestamp := hlc.MaxTimestamp
highestTimestamp := hlc.MinTimestamp
d := quantize.Get(&sip.EvalCtx.Settings.SV)
for _, resolvedSpan := range resolvedSpans {
// If quantizing is enabled, round the timestamp down to an even multiple of
// the quantization amount, to maximize the number of spans that share the
// same resolved timestamp -- even if they were individually resolved to
// _slightly_ different/newer timestamps -- to allow them to merge into
// fewer and larger spans in the frontier.
if d > 0 {
resolvedSpan.Timestamp.Logical = 0
resolvedSpan.Timestamp.WallTime -= resolvedSpan.Timestamp.WallTime % int64(d)
}
if resolvedSpan.Timestamp.Less(lowestTimestamp) {
lowestTimestamp = resolvedSpan.Timestamp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func TestStreamIngestionProcessor(t *testing.T) {
},
})
defer tc.Stopper().Stop(ctx)
st := cluster.MakeTestingClusterSettings()
quantize.Override(ctx, &st.SV, 0)

db := tc.Server(0).InternalDB().(descs.DB)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)

Expand Down Expand Up @@ -290,7 +293,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}
out, err := runStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey,
mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

emittedRows := readRows(out)
Expand Down Expand Up @@ -324,12 +327,13 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

g := ctxgroup.WithContext(ctx)
sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db,
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient,
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)

require.NoError(t, err)
minimumFlushInterval.Override(ctx, &st.SV, 5*time.Millisecond)
quantize.Override(ctx, &st.SV, 0)
out := &execinfra.RowChannel{}
out.InitWithNumSenders(sip.OutputTypes(), 1)
out.Start(ctx)
Expand Down Expand Up @@ -367,12 +371,13 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

g := ctxgroup.WithContext(ctx)
sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db,
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient,
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

minimumFlushInterval.Override(ctx, &st.SV, 50*time.Minute)
quantize.Override(ctx, &st.SV, 0)
maxKVBufferSize.Override(ctx, &st.SV, 1)
out := &execinfra.RowChannel{}
out.InitWithNumSenders(sip.OutputTypes(), 1)
Expand Down Expand Up @@ -416,13 +421,15 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

g := ctxgroup.WithContext(ctx)
sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db,
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient,
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

minimumFlushInterval.Override(ctx, &st.SV, 50*time.Minute)
maxRangeKeyBufferSize.Override(ctx, &st.SV, 1)
quantize.Override(ctx, &st.SV, 0)

out := &execinfra.RowChannel{}
out.InitWithNumSenders(sip.OutputTypes(), 1)
out.Start(ctx)
Expand Down Expand Up @@ -472,7 +479,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}}
out, err := runStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, checkpoint, tenantRekey, mockClient,
nil /* cutoverProvider */, streamingTestingKnobs)
nil /* cutoverProvider */, streamingTestingKnobs, st)
require.NoError(t, err)

emittedRows := readRows(out)
Expand All @@ -499,7 +506,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}
out, err := runStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, &errorStreamClient{},
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

// Expect no rows, and just the error.
Expand Down Expand Up @@ -680,6 +687,7 @@ func TestRandomClientGeneration(t *testing.T) {
})
defer srv.Stopper().Stop(ctx)

quantize.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, 0)
ts := srv.SystemLayer()

registry := ts.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -725,9 +733,11 @@ func TestRandomClientGeneration(t *testing.T) {
randomStreamClient.RegisterInterception(cancelAfterCheckpoints)
randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator))

st := cluster.MakeTestingClusterSettings()
quantize.Override(ctx, &st.SV, 0)
out, err := runStreamIngestionProcessor(ctx, t, registry, ts.InternalDB().(descs.DB),
topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey,
randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/)
randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/, st)
require.NoError(t, err)

numResolvedEvents := 0
Expand Down Expand Up @@ -811,9 +821,10 @@ func runStreamIngestionProcessor(
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
streamingTestingKnobs *sql.StreamingTestingKnobs,
st *cluster.Settings,
) (*distsqlutils.RowBuffer, error) {
sip, _, err := getStreamIngestionProcessor(ctx, t, registry, db,
partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs)
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs, st)
require.NoError(t, err)

out := &distsqlutils.RowBuffer{}
Expand Down Expand Up @@ -841,11 +852,11 @@ func getStreamIngestionProcessor(
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
streamingTestingKnobs *sql.StreamingTestingKnobs,
) (*streamIngestionProcessor, *cluster.Settings, error) {
st := cluster.MakeTestingClusterSettings()
st *cluster.Settings,
) (*streamIngestionProcessor, error) {
evalCtx := eval.MakeTestingEvalContext(st)
if mockClient == nil {
return nil, nil, errors.AssertionFailedf("non-nil streamclient required")
return nil, errors.AssertionFailedf("non-nil streamclient required")
}

testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
Expand Down Expand Up @@ -893,7 +904,7 @@ func getStreamIngestionProcessor(
sip.cutoverProvider = cutoverProvider
}

return sip, st, err
return sip, err
}

func resolvedSpansMinTS(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp {
Expand Down

0 comments on commit 230cef6

Please sign in to comment.