Skip to content

Commit da9c136

Browse files
rvicherynicolastakashi
authored andcommitted
receive: fix maxBufferedResponses channel size to avoid deadlock (thanos-io#7978)
* Fix maxBufferedResponses channel size to avoid deadlock Fixes thanos-io#7977 Signed-off-by: Remi Vichery <[email protected]> * Add changelog entry Signed-off-by: Remi Vichery <[email protected]> * adjust line numbers in docs/components/receive.md to match updated code Signed-off-by: Remi Vichery <[email protected]> --------- Signed-off-by: Remi Vichery <[email protected]>
1 parent 7ffc0ce commit da9c136

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1212

1313
### Fixed
1414

15+
- [#7978](https://github.com/thanos-io/thanos/pull/7978) Receive: Fix deadlock during local writes when `split-tenant-label-name` is used
16+
1517
### Added
1618

1719
- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.

docs/components/receive.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need
331331

332332
The following formula is used for calculating quorum:
333333

334-
```go mdox-exec="sed -n '1012,1022p' pkg/receive/handler.go"
334+
```go mdox-exec="sed -n '1015,1025p' pkg/receive/handler.go"
335335
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
336336
func (h *Handler) writeQuorum() int {
337337
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes

pkg/receive/handler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,10 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
777777

778778
// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
779779
// asynchronously and with this capacity we will never block on writing to the channel.
780-
maxBufferedResponses := len(localWrites)
780+
var maxBufferedResponses int
781+
for er := range localWrites {
782+
maxBufferedResponses += len(localWrites[er])
783+
}
781784
for er := range remoteWrites {
782785
maxBufferedResponses += len(remoteWrites[er])
783786
}

pkg/receive/handler_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1820,6 +1820,57 @@ func TestDistributeSeries(t *testing.T) {
18201820
require.Equal(t, map[string]struct{}{"bar": {}, "boo": {}}, hr.seenTenants)
18211821
}
18221822

1823+
func TestHandlerSplitTenantLabelLocalWrite(t *testing.T) {
1824+
const tenantIDLabelName = "thanos_tenant_id"
1825+
1826+
appendable := &fakeAppendable{
1827+
appender: newFakeAppender(nil, nil, nil),
1828+
}
1829+
1830+
h := NewHandler(nil, &Options{
1831+
Endpoint: "localhost",
1832+
SplitTenantLabelName: tenantIDLabelName,
1833+
ReceiverMode: RouterIngestor,
1834+
ReplicationFactor: 1,
1835+
ForwardTimeout: 1 * time.Second,
1836+
Writer: NewWriter(
1837+
log.NewNopLogger(),
1838+
newFakeTenantAppendable(appendable),
1839+
&WriterOptions{},
1840+
),
1841+
})
1842+
1843+
// initialize hashring with a single local endpoint matching the handler endpoint to force
1844+
// using local write
1845+
hashring, err := newSimpleHashring([]Endpoint{
1846+
{
1847+
Address: h.options.Endpoint,
1848+
},
1849+
})
1850+
require.NoError(t, err)
1851+
hr := &hashringSeenTenants{Hashring: hashring}
1852+
h.Hashring(hr)
1853+
1854+
response, err := h.RemoteWrite(context.Background(), &storepb.WriteRequest{
1855+
Timeseries: []prompb.TimeSeries{
1856+
{
1857+
Labels: labelpb.ZLabelsFromPromLabels(
1858+
labels.FromStrings("a", "b", tenantIDLabelName, "bar"),
1859+
),
1860+
},
1861+
{
1862+
Labels: labelpb.ZLabelsFromPromLabels(
1863+
labels.FromStrings("b", "a", tenantIDLabelName, "foo"),
1864+
),
1865+
},
1866+
},
1867+
})
1868+
1869+
require.NoError(t, err)
1870+
require.NotNil(t, response)
1871+
require.Equal(t, map[string]struct{}{"bar": {}, "foo": {}}, hr.seenTenants)
1872+
}
1873+
18231874
func TestHandlerFlippingHashrings(t *testing.T) {
18241875
t.Parallel()
18251876

0 commit comments

Comments
 (0)