Skip to content

Commit 690c0c7

Browse files
committed
query: Support chain deduplication algorithm
Signed-off-by: Michał Mazur <[email protected]>
1 parent e3acaeb commit 690c0c7

File tree

13 files changed

+315
-19
lines changed

13 files changed

+315
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2424
- [#7353](https://github.com/thanos-io/thanos/pull/7353) [#8045](https://github.com/thanos-io/thanos/pull/8045) Receiver/StoreGateway: Add `--matcher-cache-size` option to enable caching for regex matchers in series calls.
2525
- [#8017](https://github.com/thanos-io/thanos/pull/8017) Store Gateway: Use native histogram for binary reader load and download duration and fixed download duration metric. #8017
2626
- [#8131](https://github.com/thanos-io/thanos/pull/8131) Store Gateway: Optimize regex matchers for .* and .+. #8131
27+
- [#7808](https://github.com/thanos-io/thanos/pull/7808) Query: Support chain deduplication algorithm.
2728

2829
### Changed
2930

cmd/thanos/query.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/thanos-io/thanos/pkg/block"
2727
"github.com/thanos-io/thanos/pkg/compact/downsample"
2828
"github.com/thanos-io/thanos/pkg/component"
29+
"github.com/thanos-io/thanos/pkg/dedup"
2930
"github.com/thanos-io/thanos/pkg/discovery/dns"
3031
"github.com/thanos-io/thanos/pkg/exemplars"
3132
"github.com/thanos-io/thanos/pkg/extkingpin"
@@ -98,6 +99,11 @@ func registerQuery(app *extkingpin.App) {
9899
Default(string(query.ExternalLabels), string(query.StoreType)).
99100
Enums(string(query.ExternalLabels), string(query.StoreType))
100101

102+
deduplicationFunc := cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping series. "+
103+
"Possible values are: \"penalty\", \"chain\". If no value is specified, penalty based deduplication algorithm will be used. "+
104+
"When set to chain, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. At least one replica label has to be set via --query.replica-label flag.").
105+
Default(dedup.AlgorithmPenalty).Enum(dedup.AlgorithmPenalty, dedup.AlgorithmChain)
106+
101107
queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules. Flag may be specified multiple times as well as a comma separated list of labels.").
102108
Strings()
103109
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()
@@ -300,6 +306,7 @@ func registerQuery(app *extkingpin.App) {
300306
*dynamicLookbackDelta,
301307
time.Duration(*defaultEvaluationInterval),
302308
time.Duration(*storeResponseTimeout),
309+
*deduplicationFunc,
303310
*queryReplicaLabels,
304311
*queryPartitionLabels,
305312
selectorLset,
@@ -361,6 +368,7 @@ func runQuery(
361368
dynamicLookbackDelta bool,
362369
defaultEvaluationInterval time.Duration,
363370
storeResponseTimeout time.Duration,
371+
deduplicationFunc string,
364372
queryReplicaLabels []string,
365373
queryPartitionLabels []string,
366374
selectorLset labels.Labels,
@@ -422,6 +430,7 @@ func runQuery(
422430
seriesProxy,
423431
maxConcurrentSelects,
424432
queryTimeout,
433+
deduplicationFunc,
425434
)
426435
remoteEndpointsCreator = query.NewRemoteEndpointsCreator(
427436
logger,

docs/components/query.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,15 @@ thanos query \
103103

104104
This logic can also be controlled via parameter on QueryAPI. More details below.
105105

106+
### Deduplication Algorithms
107+
108+
Thanos Querier supports different algorithms for deduplicating overlapping series. You can choose the deduplication algorithm using the `--deduplication.func` flag. The available options are:
109+
110+
* `penalty` (default): This is the default deduplication algorithm used by Thanos. It fills gaps only after a certain penalty window. This helps avoid flapping between replicas due to minor differences or delays.
111+
* `chain`: This algorithm performs 1:1 deduplication for samples. It merges all available data points from the replicas without applying any penalty. This is useful in deployments based on receivers, where each replica is populated by the same data. In such cases, using the penalty algorithm may cause gaps even when data is available in other replicas.
112+
113+
Note that deduplication of HA groups is not supported by the `chain` algorithm.
114+
106115
## Thanos PromQL Engine (experimental)
107116

108117
By default, Thanos querier comes with standard Prometheus PromQL engine. However, when `--query.promql-engine=thanos` is specified, Thanos will use [experimental Thanos PromQL engine](http://github.com/thanos-community/promql-engine) which is a drop-in, efficient implementation of PromQL engine with query planner and optimizers.
@@ -297,6 +306,16 @@ Flags:
297306
--auto-gomemlimit.ratio=0.9
298307
The ratio of reserved GOMEMLIMIT memory to the
299308
detected maximum container or system memory.
309+
--deduplication.func=penalty
310+
Experimental. Deduplication algorithm for
311+
merging overlapping series. Possible values
312+
are: "penalty", "chain". If no value is
313+
specified, penalty based deduplication
314+
algorithm will be used. When set to chain, the
315+
default compact deduplication merger is used,
316+
which performs 1:1 deduplication for samples.
317+
At least one replica label has to be set via
318+
--query.replica-label flag.
300319
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
301320
consumption.
302321
--endpoint=<endpoint> ... (Deprecated): Addresses of statically

pkg/api/query/grpc_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/thanos-io/thanos/pkg/api/query/querypb"
2424
"github.com/thanos-io/thanos/pkg/component"
25+
"github.com/thanos-io/thanos/pkg/dedup"
2526
"github.com/thanos-io/thanos/pkg/extpromql"
2627
"github.com/thanos-io/thanos/pkg/query"
2728
"github.com/thanos-io/thanos/pkg/store"
@@ -31,7 +32,7 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
3132
logger := log.NewNopLogger()
3233
reg := prometheus.NewRegistry()
3334
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval)
34-
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute)
35+
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty)
3536
remoteEndpointsCreator := query.NewRemoteEndpointsCreator(logger, func() []query.Client { return nil }, nil, 1*time.Minute, true, true)
3637
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
3738
api := NewGRPCAPI(time.Now, nil, queryableCreator, remoteEndpointsCreator, queryFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)
@@ -75,7 +76,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
7576
logger := log.NewNopLogger()
7677
reg := prometheus.NewRegistry()
7778
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval)
78-
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute)
79+
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty)
7980
remoteEndpointsCreator := query.NewRemoteEndpointsCreator(logger, func() []query.Client { return nil }, nil, 1*time.Minute, true, true)
8081
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
8182
tests := []struct {

pkg/api/query/v1_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
baseAPI "github.com/thanos-io/thanos/pkg/api"
5454
"github.com/thanos-io/thanos/pkg/compact"
5555
"github.com/thanos-io/thanos/pkg/component"
56+
"github.com/thanos-io/thanos/pkg/dedup"
5657
"github.com/thanos-io/thanos/pkg/gate"
5758
"github.com/thanos-io/thanos/pkg/query"
5859
"github.com/thanos-io/thanos/pkg/rules/rulespb"
@@ -213,7 +214,7 @@ func TestQueryEndpoints(t *testing.T) {
213214
baseAPI: &baseAPI.BaseAPI{
214215
Now: func() time.Time { return now },
215216
},
216-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
217+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
217218
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
218219
queryCreate: queryFactory,
219220
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
@@ -661,7 +662,7 @@ func TestQueryExplainEndpoints(t *testing.T) {
661662
baseAPI: &baseAPI.BaseAPI{
662663
Now: func() time.Time { return now },
663664
},
664-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
665+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
665666
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
666667
queryCreate: queryFactory,
667668
defaultEngine: PromqlEnginePrometheus,
@@ -725,7 +726,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
725726
baseAPI: &baseAPI.BaseAPI{
726727
Now: func() time.Time { return now },
727728
},
728-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
729+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
729730
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
730731
queryCreate: queryFactory,
731732
defaultEngine: PromqlEnginePrometheus,
@@ -894,7 +895,7 @@ func TestMetadataEndpoints(t *testing.T) {
894895
baseAPI: &baseAPI.BaseAPI{
895896
Now: func() time.Time { return now },
896897
},
897-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
898+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
898899
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
899900
queryCreate: queryFactory,
900901
defaultEngine: PromqlEnginePrometheus,
@@ -911,7 +912,7 @@ func TestMetadataEndpoints(t *testing.T) {
911912
baseAPI: &baseAPI.BaseAPI{
912913
Now: func() time.Time { return now },
913914
},
914-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
915+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
915916
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
916917
queryCreate: queryFactory,
917918
defaultEngine: PromqlEnginePrometheus,

pkg/dedup/iter.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@ import (
1515
"github.com/thanos-io/thanos/pkg/store/storepb"
1616
)
1717

18+
const (
19+
AlgorithmPenalty = "penalty"
20+
AlgorithmChain = "chain"
21+
)
22+
1823
type dedupSeriesSet struct {
19-
set storage.SeriesSet
20-
isCounter bool
24+
set storage.SeriesSet
25+
isCounter bool
26+
deduplicationFunc string
2127

2228
replicas []storage.Series
2329

@@ -102,9 +108,9 @@ func (o *overlapSplitSet) Err() error {
102108

103109
// NewSeriesSet returns seriesSet that deduplicates the same series.
104110
// The series in series set are expected be sorted by all labels.
105-
func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet {
111+
func NewSeriesSet(set storage.SeriesSet, f string, deduplicationFunc string) storage.SeriesSet {
106112
// TODO: remove dependency on knowing whether it is a counter.
107-
s := &dedupSeriesSet{set: set, isCounter: isCounter(f), f: f}
113+
s := &dedupSeriesSet{set: set, isCounter: isCounter(f), f: f, deduplicationFunc: deduplicationFunc}
108114
s.ok = s.set.Next()
109115
if s.ok {
110116
s.peek = s.set.At()
@@ -154,7 +160,11 @@ func (s *dedupSeriesSet) At() storage.Series {
154160
repl := make([]storage.Series, len(s.replicas))
155161
copy(repl, s.replicas)
156162

157-
return newDedupSeries(s.lset, repl, s.f)
163+
if s.deduplicationFunc == AlgorithmChain {
164+
return seriesWithLabels{Series: storage.ChainedSeriesMerge(repl...), lset: s.lset}
165+
} else {
166+
return newDedupSeries(s.lset, repl, s.f)
167+
}
158168
}
159169

160170
func (s *dedupSeriesSet) Err() error {

pkg/dedup/iter_test.go

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,151 @@ func TestDedupSeriesSet(t *testing.T) {
544544
if tcase.isCounter {
545545
f = "rate"
546546
}
547-
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f)
547+
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmPenalty)
548+
var ats []storage.Series
549+
for dedupSet.Next() {
550+
ats = append(ats, dedupSet.At())
551+
}
552+
testutil.Ok(t, dedupSet.Err())
553+
testutil.Equals(t, len(tcase.exp), len(ats))
554+
555+
for i, s := range ats {
556+
testutil.Equals(t, tcase.exp[i].lset, s.Labels(), "labels mismatch for series %v", i)
557+
res := expandSeries(t, s.Iterator(nil))
558+
testutil.Equals(t, tcase.exp[i].samples, res, "values mismatch for series :%v", i)
559+
}
560+
})
561+
}
562+
}
563+
564+
func TestDedupSeriesSet_Chain(t *testing.T) {
565+
for _, tcase := range []struct {
566+
name string
567+
input []series
568+
exp []series
569+
}{
570+
{
571+
name: "Single dedup label - exact match",
572+
input: []series{
573+
{
574+
lset: labels.FromStrings("a", "1", "c", "3"),
575+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
576+
},
577+
{
578+
lset: labels.FromStrings("a", "1", "c", "3"),
579+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
580+
},
581+
},
582+
exp: []series{
583+
{
584+
lset: labels.FromStrings("a", "1", "c", "3"),
585+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
586+
},
587+
},
588+
},
589+
{
590+
name: "Single dedup label - gap in one series",
591+
input: []series{
592+
{
593+
lset: labels.FromStrings("a", "1", "c", "3"),
594+
samples: []sample{{10000, 1}, {30000, 3}},
595+
},
596+
{
597+
lset: labels.FromStrings("a", "1", "c", "3"),
598+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
599+
},
600+
},
601+
exp: []series{
602+
{
603+
lset: labels.FromStrings("a", "1", "c", "3"),
604+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
605+
},
606+
},
607+
},
608+
{
609+
name: "Single dedup label - gaps in two series",
610+
input: []series{
611+
{
612+
lset: labels.FromStrings("a", "1", "c", "3"),
613+
samples: []sample{{10000, 1}, {30000, 3}},
614+
},
615+
{
616+
lset: labels.FromStrings("a", "1", "c", "3"),
617+
samples: []sample{{10000, 1}, {20000, 2}},
618+
},
619+
},
620+
exp: []series{
621+
{
622+
lset: labels.FromStrings("a", "1", "c", "3"),
623+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
624+
},
625+
},
626+
},
627+
{
628+
name: "Multi dedup label - exact match",
629+
input: []series{
630+
{
631+
lset: labels.FromStrings("a", "1", "c", "3"),
632+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
633+
},
634+
{
635+
lset: labels.FromStrings("a", "1", "c", "3"),
636+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
637+
},
638+
{
639+
lset: labels.FromStrings("a", "2", "c", "3"),
640+
samples: []sample{{10000, 101}, {20000, 102}, {30000, 103}},
641+
},
642+
{
643+
lset: labels.FromStrings("a", "2", "c", "3"),
644+
samples: []sample{{10000, 101}, {20000, 102}, {30000, 103}},
645+
},
646+
},
647+
exp: []series{
648+
{
649+
lset: labels.FromStrings("a", "1", "c", "3"),
650+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
651+
},
652+
{
653+
lset: labels.FromStrings("a", "2", "c", "3"),
654+
samples: []sample{{10000, 101}, {20000, 102}, {30000, 103}},
655+
},
656+
},
657+
},
658+
{
659+
name: "Multi dedup label - gaps in two series",
660+
input: []series{
661+
{
662+
lset: labels.FromStrings("a", "1", "c", "3"),
663+
samples: []sample{{20000, 2}, {30000, 3}},
664+
},
665+
{
666+
lset: labels.FromStrings("a", "1", "c", "3"),
667+
samples: []sample{{10000, 1}, {20000, 2}},
668+
},
669+
{
670+
lset: labels.FromStrings("a", "2", "c", "3"),
671+
samples: []sample{{10000, 101}, {20000, 102}},
672+
},
673+
{
674+
lset: labels.FromStrings("a", "2", "c", "3"),
675+
samples: []sample{{10000, 101}, {30000, 103}},
676+
},
677+
},
678+
exp: []series{
679+
{
680+
lset: labels.FromStrings("a", "1", "c", "3"),
681+
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}},
682+
},
683+
{
684+
lset: labels.FromStrings("a", "2", "c", "3"),
685+
samples: []sample{{10000, 101}, {20000, 102}, {30000, 103}},
686+
},
687+
},
688+
},
689+
} {
690+
t.Run(tcase.name, func(t *testing.T) {
691+
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, "", AlgorithmChain)
548692
var ats []storage.Series
549693
for dedupSet.Next() {
550694
ats = append(ats, dedupSet.At())

0 commit comments

Comments
 (0)