Skip to content

Commit

Permalink
rename to quorum
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Nov 22, 2024
1 parent 77347ce commit 7c98625
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
3 changes: 1 addition & 2 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ func (s *dedupSeriesSet) At() storage.Series {
copy(repl, s.replicas)
if s.deduplicationFunc == AlgorithmQuorum {
// merge all samples which are ingested via receiver, no skips.
// feed the merged series into dedup series which apply counter adjustment
return NewMergedSeries(s.lset, repl, s.f)
return NewQuorumSeries(s.lset, repl, s.f)
}
if s.deduplicationFunc == AlgorithmChain {
return seriesWithLabels{Series: storage.ChainedSeriesMerge(repl...), lset: s.lset}
Expand Down
32 changes: 16 additions & 16 deletions pkg/dedup/quorum_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
// when replicas has conflict values at the same timestamp, the first replica will be selected.
type mergedSeries struct {
// quorumSeries is a storage.Series that implements quorum algorithm.
// when replicas has conflict values at the same timestamp, the value in majority replica will be selected.
type quorumSeries struct {
lset labels.Labels
replicas []storage.Series

isCounter bool
}

func NewMergedSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
return &mergedSeries{
func NewQuorumSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
return &quorumSeries{
lset: lset,
replicas: replicas,

isCounter: isCounter(f),
}
}

func (m *mergedSeries) Labels() labels.Labels {
func (m *quorumSeries) Labels() labels.Labels {
return m.lset
}
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
func (m *quorumSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
iters := make([]adjustableSeriesIterator, 0, len(m.replicas))
oks := make([]bool, 0, len(m.replicas))
for _, r := range m.replicas {
Expand All @@ -47,7 +47,7 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
iters = append(iters, it)
oks = append(oks, ok)
}
return &mergedSeriesIterator{
return &quorumSeriesIterator{
iters: iters,
oks: oks,
lastT: math.MinInt64,
Expand Down Expand Up @@ -82,7 +82,7 @@ func (q *quorumValuePicker) addValue(v float64) bool {
return false
}

type mergedSeriesIterator struct {
type quorumSeriesIterator struct {
iters []adjustableSeriesIterator
oks []bool

Expand All @@ -91,7 +91,7 @@ type mergedSeriesIterator struct {
lastIter adjustableSeriesIterator
}

func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
func (m *quorumSeriesIterator) Next() chunkenc.ValueType {
// m.lastIter points to the last iterator that has the latest timestamp.
// m.lastT always aligns with m.lastIter unless when m.lastIter is nil.
// m.lastIter is nil only in the following cases:
Expand Down Expand Up @@ -131,7 +131,7 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
return chunkenc.ValFloat
}

func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
func (m *quorumSeriesIterator) Seek(t int64) chunkenc.ValueType {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for m.lastT < t && m.Next() != chunkenc.ValNone {
}
Expand All @@ -142,25 +142,25 @@ func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
return chunkenc.ValFloat
}

func (m *mergedSeriesIterator) At() (t int64, v float64) {
func (m *quorumSeriesIterator) At() (t int64, v float64) {
return m.lastIter.At()
}

func (m *mergedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
func (m *quorumSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
return m.lastIter.AtHistogram(h)
}

func (m *mergedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
func (m *quorumSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return m.lastIter.AtFloatHistogram(fh)
}

func (m *mergedSeriesIterator) AtT() int64 {
func (m *quorumSeriesIterator) AtT() int64 {
return m.lastT
}

// Err All At() funcs should panic if called after Next() or Seek() return ValNone.
// Only Err() should return nil even after Next() or Seek() return ValNone.
func (m *mergedSeriesIterator) Err() error {
func (m *quorumSeriesIterator) Err() error {
if m.lastIter == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dedup/quorum_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestIteratorEdgeCases(t *testing.T) {
ms := NewMergedSeries(labels.Labels{}, []storage.Series{}, "")
ms := NewQuorumSeries(labels.Labels{}, []storage.Series{}, "")
it := ms.Iterator(nil)
testutil.Ok(t, it.Err())
testutil.Equals(t, int64(math.MinInt64), it.AtT())
Expand Down
3 changes: 2 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ func newQuerierWithOpts(

partialResponseStrategy := storepb.PartialResponseStrategy_ABORT
if opts.GroupReplicaPartialResponseStrategy {
level.Debug(logger).Log("msg", "Enabled group-replica partial response strategy in newQuerierInternal")
level.Info(logger).Log("msg", "Enabled group-replica partial response strategy in newQuerierInternal")
partialResponseStrategy = storepb.PartialResponseStrategy_GROUP_REPLICA
} else if partialResponse {
partialResponseStrategy = storepb.PartialResponseStrategy_WARN
}
level.Info(logger).Log("msg", "Deduplication algorithm applied", "func", opts.DeduplicationFunc)
return &querier{
logger: logger,
selectGate: selectGate,
Expand Down

0 comments on commit 7c98625

Please sign in to comment.