Skip to content

Commit 2fdeba7

Browse files
committed
Optimize format and add more test
Signed-off-by: Filip Petkovski <[email protected]>
1 parent 504aba5 commit 2fdeba7

File tree

8 files changed

+332
-328
lines changed

8 files changed

+332
-328
lines changed

pkg/receive/handler.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap
406406

407407
// Close stops the Handler.
408408
func (h *Handler) Close() {
409+
_ = h.peers.Close()
409410
runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server")
410411
}
411412

@@ -1330,7 +1331,7 @@ type peerWorker struct {
13301331
forwardDelay prometheus.Histogram
13311332
}
13321333

1333-
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, useCapNProtoReplication bool, dialOpts ...grpc.DialOption) peersContainer {
1334+
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, useCapNProtoReplication bool, dialOpts ...grpc.DialOption) *peerGroup {
13341335
return &peerGroup{
13351336
dialOpts: dialOpts,
13361337
connections: map[Endpoint]*peerWorker{},
@@ -1350,6 +1351,7 @@ type peersContainer interface {
13501351
markPeerUnavailable(Endpoint)
13511352
markPeerAvailable(Endpoint)
13521353
reset()
1354+
io.Closer
13531355
}
13541356

13551357
func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) {
@@ -1392,6 +1394,13 @@ type peerGroup struct {
13921394
dialer func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
13931395
}
13941396

1397+
func (p *peerGroup) Close() error {
1398+
for _, c := range p.connections {
1399+
c.wp.Close()
1400+
}
1401+
return nil
1402+
}
1403+
13951404
func (p *peerGroup) close(endpoint Endpoint) error {
13961405
p.m.Lock()
13971406
defer p.m.Unlock()

pkg/receive/handler_test.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ type fakePeersGroup struct {
177177
closeCalled map[Endpoint]bool
178178
}
179179

180+
func (g *fakePeersGroup) Close() error {
181+
for _, c := range g.clients {
182+
c.wp.Close()
183+
}
184+
return nil
185+
}
186+
180187
func (g *fakePeersGroup) markPeerUnavailable(s Endpoint) {
181188
}
182189

@@ -253,19 +260,13 @@ func newTestHandlerHashring(
253260
peer = newPeerWorker(client, prometheus.NewHistogram(prometheus.HistogramOpts{}), 1)
254261
closers = append(closers, func() error {
255262
srv.Shutdown()
256-
peer.wp.Close()
257263
return goerrors.Join(listener.Close(), client.Close())
258264
})
259265
go func() { _ = srv.ListenAndServe() }()
260-
fakePeers.clients[endpoint] = peer
261266
} else {
262267
peer = newPeerWorker(&fakeRemoteWriteGRPCServer{h: h}, prometheus.NewHistogram(prometheus.HistogramOpts{}), 1)
263-
closers = append(closers, func() error {
264-
peer.wp.Close()
265-
return nil
266-
})
267-
fakePeers.clients[endpoint] = peer
268268
}
269+
fakePeers.clients[endpoint] = peer
269270
}
270271
// Use hashmod as default.
271272
if hashringAlgo == "" {
@@ -663,6 +664,12 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist
663664
}
664665
defer func() {
665666
testutil.Ok(t, closeFunc())
667+
// Wait a few milliseconds for peer workers to process the queue.
668+
time.AfterFunc(50*time.Millisecond, func() {
669+
for _, h := range handlers {
670+
h.Close()
671+
}
672+
})
666673
}()
667674
tenant := "test"
668675

pkg/receive/writecapnp/client.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net"
99
"sync"
1010

11+
"capnproto.org/go/capnp/v3"
1112
"capnproto.org/go/capnp/v3/rpc"
1213
"github.com/pkg/errors"
1314
"google.golang.org/grpc"
@@ -45,7 +46,7 @@ type RemoteWriteClient struct {
4546
mu sync.Mutex
4647

4748
dialer Dialer
48-
conn *net.Conn
49+
conn *rpc.Conn
4950

5051
writer Writer
5152
}
@@ -64,19 +65,32 @@ func (r *RemoteWriteClient) writeWithReconnect(ctx context.Context, numReconnect
6465
if err := r.connect(ctx); err != nil {
6566
return nil, err
6667
}
68+
arena := capnp.SingleSegment(nil)
69+
defer arena.Release()
70+
6771
result, release := r.writer.Write(ctx, func(params Writer_write_Params) error {
68-
wr, err := Build(in.Tenant, in.Timeseries)
72+
_, seg, err := capnp.NewMessage(arena)
73+
if err != nil {
74+
return err
75+
}
76+
wr, err := NewRootWriteRequest(seg)
6977
if err != nil {
7078
return err
7179
}
72-
return params.SetWr(wr)
80+
if err := params.SetWr(wr); err != nil {
81+
return err
82+
}
83+
wr, err = params.Wr()
84+
if err != nil {
85+
return err
86+
}
87+
return BuildInto(wr, in.Tenant, in.Timeseries)
7388
})
7489
defer release()
7590

7691
s, err := result.Struct()
7792
if err != nil {
78-
if numReconnects > 0 {
79-
_ = r.Close()
93+
if numReconnects > 0 && capnp.IsDisconnected(err) {
8094
if err := r.Close(); err != nil {
8195
return nil, err
8296
}
@@ -110,20 +124,18 @@ func (r *RemoteWriteClient) connect(ctx context.Context) error {
110124
if err != nil {
111125
return errors.Wrap(err, "failed to dial peer")
112126
}
113-
r.writer = Writer(rpc.NewConn(rpc.NewPackedStreamTransport(conn), nil).Bootstrap(ctx))
114-
r.conn = &conn
127+
r.conn = rpc.NewConn(rpc.NewPackedStreamTransport(conn), nil)
128+
r.writer = Writer(r.conn.Bootstrap(ctx))
115129
return nil
116130
}
117131

118132
func (r *RemoteWriteClient) Close() error {
119133
r.mu.Lock()
120-
defer r.mu.Unlock()
121134
if r.conn != nil {
122-
r.writer.Release()
123-
124-
conn := *r.conn
135+
conn := r.conn
125136
r.conn = nil
126-
return conn.Close()
137+
go conn.Close()
127138
}
139+
r.mu.Unlock()
128140
return nil
129141
}

0 commit comments

Comments
 (0)