From 09108991531e430b091ecdc242375415bbeada14 Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Fri, 29 Mar 2024 15:12:52 -0400 Subject: [PATCH 1/2] GODRIVER-3107 Fix leaking rttMonitor.runHellos() routine. --- x/mongo/driver/topology/rtt_monitor.go | 6 ------ x/mongo/driver/topology/server.go | 9 ++++++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/x/mongo/driver/topology/rtt_monitor.go b/x/mongo/driver/topology/rtt_monitor.go index 3dd031f2ea..89fc30cbd7 100644 --- a/x/mongo/driver/topology/rtt_monitor.go +++ b/x/mongo/driver/topology/rtt_monitor.go @@ -56,7 +56,6 @@ type rttMonitor struct { cfg *rttConfig ctx context.Context cancelFn context.CancelFunc - started bool } var _ driver.RTTMonitor = &rttMonitor{} @@ -83,7 +82,6 @@ func (r *rttMonitor) connect() { r.connMu.Lock() defer r.connMu.Unlock() - r.started = true r.closeWg.Add(1) go func() { @@ -97,10 +95,6 @@ func (r *rttMonitor) disconnect() { r.connMu.Lock() defer r.connMu.Unlock() - if !r.started { - return - } - r.cancelFn() // Wait for the existing connection to complete. diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index f4c6d744aa..b8fd2b3d49 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -125,6 +125,7 @@ type Server struct { processErrorLock sync.Mutex rttMonitor *rttMonitor + monitorOnce *sync.Once } // updateTopologyCallback is a callback used to create a server that should be called when the parent Topology instance @@ -168,6 +169,8 @@ func NewServer(addr address.Address, topologyID primitive.ObjectID, opts ...Serv subscribers: make(map[uint64]chan description.Server), globalCtx: globalCtx, globalCtxCancel: globalCtxCancel, + + monitorOnce: new(sync.Once), } s.desc.Store(description.NewDefaultServer(addr)) rttCfg := &rttConfig{ @@ -285,10 +288,10 @@ func (s *Server) Disconnect(ctx context.Context) error { close(s.done) s.cancelCheck() - s.rttMonitor.disconnect() s.pool.close(ctx) s.closewg.Wait() + s.rttMonitor.disconnect() atomic.StoreInt64(&s.state, serverDisconnected) return nil @@ -661,8 +664,8 @@ func (s *Server) update() { transitionedFromNetworkError := desc.LastError != nil && unwrapConnectionError(desc.LastError) != nil && previousDescription.Kind != description.Unknown - if isStreamingEnabled(s) && isStreamable(s) && !s.rttMonitor.started { - s.rttMonitor.connect() + if isStreamingEnabled(s) && isStreamable(s) { + s.monitorOnce.Do(s.rttMonitor.connect) } if isStreamable(s) || connectionIsStreaming || transitionedFromNetworkError { From 1fdb3d4d7889bbaaba6a1085d09033addf7a288c Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Thu, 18 Apr 2024 11:53:15 -0400 Subject: [PATCH 2/2] remove pointer of sync.Once --- x/mongo/driver/topology/server.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index b8fd2b3d49..99f8dd618b 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -125,7 +125,7 @@ type Server struct { processErrorLock sync.Mutex rttMonitor *rttMonitor - monitorOnce *sync.Once + monitorOnce sync.Once } // updateTopologyCallback is a callback used to create a server that should be called when the parent Topology instance @@ -169,8 +169,6 @@ func NewServer(addr address.Address, topologyID primitive.ObjectID, opts ...Serv subscribers: make(map[uint64]chan description.Server), globalCtx: globalCtx, globalCtxCancel: globalCtxCancel, - - monitorOnce: new(sync.Once), } s.desc.Store(description.NewDefaultServer(addr)) rttCfg := &rttConfig{