Skip to content

Commit 065c3be

Browse files
authored
Merge branch 'main' into close-loser-tree
Signed-off-by: Filip Petkovski <[email protected]>
2 parents 4550964 + ab43b2b commit 065c3be

26 files changed

+432
-213
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2424
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
2525
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
2626
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.
27+
- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals.
2728
- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call.
2829

2930
### Added

cmd/thanos/compact.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ func runCompact(
290290
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
291291
api.SetLoaded(blocks, err)
292292
})
293+
294+
var syncMetasTimeout = conf.waitInterval
295+
if !conf.wait {
296+
syncMetasTimeout = 0
297+
}
293298
sy, err = compact.NewMetaSyncer(
294299
logger,
295300
reg,
@@ -299,6 +304,7 @@ func runCompact(
299304
ignoreDeletionMarkFilter,
300305
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
301306
compactMetrics.garbageCollectedBlocks,
307+
syncMetasTimeout,
302308
)
303309
if err != nil {
304310
return errors.Wrap(err, "create syncer")

cmd/thanos/query.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ func runQuery(
503503
dns.ResolverType(dnsSDResolver),
504504
),
505505
dnsSDInterval,
506+
logger,
506507
)
507508

508509
dnsEndpointProvider := dns.NewProvider(
@@ -608,7 +609,7 @@ func runQuery(
608609
fileSDCache.Update(update)
609610
endpoints.Update(ctxUpdate)
610611

611-
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
612+
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
612613
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
613614
}
614615

@@ -628,22 +629,22 @@ func runQuery(
628629
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
629630
resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval)
630631
defer resolveCancel()
631-
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
632+
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
632633
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
633634
}
634-
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
635+
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil {
635636
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
636637
}
637-
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
638+
if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil {
638639
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
639640
}
640-
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
641+
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil {
641642
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
642643
}
643-
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
644+
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil {
644645
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
645646
}
646-
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil {
647+
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil {
647648
level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err)
648649

649650
}

cmd/thanos/rule.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func runRule(
452452
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
453453
resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second)
454454
defer resolveCancel()
455-
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints); err != nil {
455+
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil {
456456
level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err)
457457
}
458458
return nil

cmd/thanos/sidecar.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ func runSidecar(
235235
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
236236
defer iterCancel()
237237

238+
if err := m.UpdateTimestamps(iterCtx); err != nil {
239+
level.Warn(logger).Log(
240+
"msg", "failed to fetch timestamps. Is Prometheus running? Retrying",
241+
"err", err,
242+
)
243+
return err
244+
}
245+
238246
if err := m.UpdateLabels(iterCtx); err != nil {
239247
level.Warn(logger).Log(
240248
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
@@ -266,16 +274,21 @@ func runSidecar(
266274
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
267275
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
268276
defer iterCancel()
277+
if err := m.UpdateTimestamps(iterCtx); err != nil {
278+
level.Warn(logger).Log("msg", "updating timestamps failed", "err", err)
279+
promUp.Set(0)
280+
statusProber.NotReady(err)
281+
return nil
282+
}
269283

270284
if err := m.UpdateLabels(iterCtx); err != nil {
271-
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
285+
level.Warn(logger).Log("msg", "updating labels failed", "err", err)
272286
promUp.Set(0)
273287
statusProber.NotReady(err)
274-
} else {
275-
promUp.Set(1)
276-
statusProber.Ready()
288+
return nil
277289
}
278-
290+
promUp.Set(1)
291+
statusProber.Ready()
279292
return nil
280293
})
281294
}, func(error) {
@@ -317,7 +330,7 @@ func runSidecar(
317330
}),
318331
info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) {
319332
if httpProbe.IsReady() {
320-
mint, maxt := promStore.Timestamps()
333+
mint, maxt := m.Timestamps()
321334
return &infopb.StoreInfo{
322335
MinTime: mint,
323336
MaxTime: maxt,
@@ -409,13 +422,6 @@ func runSidecar(
409422
if uploaded, err := s.Sync(ctx); err != nil {
410423
level.Warn(logger).Log("err", err, "uploaded", uploaded)
411424
}
412-
413-
minTime, _, err := s.Timestamps()
414-
if err != nil {
415-
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
416-
return nil
417-
}
418-
m.UpdateTimestamps(minTime, math.MaxInt64)
419425
return nil
420426
})
421427
}, func(error) {
@@ -490,16 +496,19 @@ func (s *promMetadata) UpdateLabels(ctx context.Context) error {
490496
return nil
491497
}
492498

493-
func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
499+
func (s *promMetadata) UpdateTimestamps(ctx context.Context) error {
494500
s.mtx.Lock()
495501
defer s.mtx.Unlock()
496502

497-
if mint < s.limitMinTime.PrometheusTimestamp() {
498-
mint = s.limitMinTime.PrometheusTimestamp()
503+
mint, err := s.client.LowestTimestamp(ctx, s.promURL)
504+
if err != nil {
505+
return err
499506
}
500507

501-
s.mint = mint
502-
s.maxt = maxt
508+
s.mint = min(s.limitMinTime.PrometheusTimestamp(), mint)
509+
s.maxt = math.MaxInt64
510+
511+
return nil
503512
}
504513

505514
func (s *promMetadata) Labels() labels.Labels {

cmd/thanos/tools_bucket.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
870870
ignoreDeletionMarkFilter,
871871
stubCounter,
872872
stubCounter,
873+
0,
873874
)
874875
if err != nil {
875876
return errors.Wrap(err, "create syncer")
@@ -1413,6 +1414,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
14131414
ignoreDeletionMarkFilter,
14141415
stubCounter,
14151416
stubCounter,
1417+
0,
14161418
)
14171419
if err != nil {
14181420
return errors.Wrap(err, "create syncer")

internal/cortex/chunk/cache/memcached_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (c *memcachedClient) updateMemcacheServers() error {
251251
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
252252
defer cancel()
253253

254-
if err := c.provider.Resolve(ctx, c.addresses); err != nil {
254+
if err := c.provider.Resolve(ctx, c.addresses, true); err != nil {
255255
return err
256256
}
257257
servers = c.provider.Addresses()

pkg/cache/groupcache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
134134

135135
go func() {
136136
for {
137-
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers); err != nil {
137+
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers, true); err != nil {
138138
level.Error(logger).Log("msg", "failed to resolve addresses for groupcache", "err", err)
139139
} else {
140140
err := universe.Set(dnsGroupcacheProvider.Addresses()...)

pkg/cacheutil/memcached_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ type memcachedClient struct {
211211
// AddressProvider performs node address resolution given a list of clusters.
212212
type AddressProvider interface {
213213
// Resolves the provided list of memcached cluster to the actual nodes
214-
Resolve(context.Context, []string) error
214+
Resolve(context.Context, []string, bool) error
215215

216216
// Returns the nodes
217217
Addresses() []string
@@ -638,7 +638,7 @@ func (c *memcachedClient) resolveAddrs() error {
638638
defer cancel()
639639

640640
// If some of the dns resolution fails, log the error.
641-
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
641+
if err := c.addressProvider.Resolve(ctx, c.config.Addresses, true); err != nil {
642642
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
643643
}
644644
// Fail in case no server address is resolved.

pkg/clientconfig/http.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func (c HTTPFileSDConfig) convert() (file.SDConfig, error) {
330330
}
331331

332332
type AddressProvider interface {
333-
Resolve(context.Context, []string) error
333+
Resolve(context.Context, []string, bool) error
334334
Addresses() []string
335335
}
336336

@@ -433,5 +433,5 @@ func (c *HTTPClient) Discover(ctx context.Context) {
433433

434434
// Resolve refreshes and resolves the list of targets.
435435
func (c *HTTPClient) Resolve(ctx context.Context) error {
436-
return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...))
436+
return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...), true)
437437
}

0 commit comments

Comments
 (0)