Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
7 changes: 5 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,8 +1574,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tenant,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
Expand Down Expand Up @@ -3356,6 +3354,11 @@ func (r *bucketIndexReader) Close() error {
return nil
}

func (b *blockSeriesClient) CloseSend() error {
b.Close()
return nil
}

// LookupLabelsSymbols allows populates label set strings from symbolized label set.
func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, b *labels.Builder) error {
b.Reset(labels.EmptyLabels())
Expand Down
20 changes: 14 additions & 6 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
Expand Down Expand Up @@ -318,6 +319,7 @@ func newLazyRespSet(
frameTimeout: frameTimeout,
storeName: storeName,
storeLabelSets: storeLabelSets,
cl: cl,
closeSeries: closeSeries,
span: span,
dataOrFinishEvent: dataAvailable,
Expand Down Expand Up @@ -446,8 +448,10 @@ func newAsyncRespSet(
emptyStreamResponses prometheus.Counter,
) (respSet, error) {

var span opentracing.Span
var closeSeries context.CancelFunc
var (
span opentracing.Span
cancel context.CancelFunc
)

storeID, storeAddr, isLocalStore := storeInfo(st)
seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{
Expand All @@ -459,7 +463,7 @@ func newAsyncRespSet(
"store.addr": storeAddr,
})

seriesCtx, closeSeries = context.WithCancel(seriesCtx)
seriesCtx, cancel = context.WithCancel(seriesCtx)

shardMatcher := shardInfo.Matcher(buffers)

Expand All @@ -474,7 +478,7 @@ func newAsyncRespSet(

span.SetTag("err", err.Error())
span.Finish()
closeSeries()
cancel()
return nil, err
}

Expand All @@ -497,7 +501,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -509,7 +513,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -530,6 +534,7 @@ func (l *lazyRespSet) Close() {
l.dataOrFinishEvent.Signal()

l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
Expand All @@ -539,6 +544,7 @@ type eagerRespSet struct {
// Generic parameters.
span opentracing.Span

cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
frameTimeout time.Duration

Expand Down Expand Up @@ -569,6 +575,7 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
cl: cl,
closeSeries: closeSeries,
frameTimeout: frameTimeout,
bufferedResponses: []*storepb.SeriesResponse{},
Expand Down Expand Up @@ -717,6 +724,7 @@ func (l *eagerRespSet) Close() {
l.closeSeries()
}
l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

func (l *eagerRespSet) At() *storepb.SeriesResponse {
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/proxy_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
Expand All @@ -43,6 +44,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
Expand All @@ -62,6 +64,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
Expand All @@ -71,6 +74,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
Expand All @@ -91,6 +95,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
Expand All @@ -100,6 +105,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
Expand All @@ -120,6 +126,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
Expand All @@ -129,6 +136,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
Expand All @@ -149,6 +157,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
Expand All @@ -158,6 +167,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
Expand All @@ -178,6 +188,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
Expand All @@ -188,6 +199,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
Expand Down Expand Up @@ -219,6 +231,12 @@ func TestProxyResponseTreeSort(t *testing.T) {
}
}

type nopClientSendCloser struct {
storepb.Store_SeriesClient
}

func (c nopClientSendCloser) CloseSend() error { return nil }

func TestSortWithoutLabels(t *testing.T) {
for _, tcase := range []struct {
input []*storepb.SeriesResponse
Expand Down Expand Up @@ -341,6 +359,7 @@ func BenchmarkKWayMerge(b *testing.B) {
for j := 0; j < 1000; j++ {
respSets = append(respSets, &eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")),
Expand Down
Loading
Loading