From c7586b7fa3223a382b31099ab51dbb49addfc596 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 19 Nov 2022 21:39:27 +0100 Subject: [PATCH 1/6] Simplify pool.pop by using sync.Map.LoadAndDelete --- network/pool.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/network/pool.go b/network/pool.go index 70e129cc..7f02d87d 100644 --- a/network/pool.go +++ b/network/pool.go @@ -63,8 +63,7 @@ func (p *PoolImpl) Put(client *Client) error { } func (p *PoolImpl) Pop(id string) *Client { - if client, ok := p.pool.Load(id); ok { - p.pool.Delete(id) + if client, ok := p.pool.LoadAndDelete(id); ok { p.logger.Debug().Msgf("Client %s has been popped from the pool", id) if c, ok := client.(*Client); ok { return c From 160a07bc955eddfd800bc6683345c1b07e1a7546 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 19 Nov 2022 23:16:38 +0100 Subject: [PATCH 2/6] Create a generic pool for both client and server connections --- network/pool.go | 91 +++++++++++++----------------------------- network/proxy.go | 102 +++++++++++++++++++++++------------------------ 2 files changed, 76 insertions(+), 117 deletions(-) diff --git a/network/pool.go b/network/pool.go index 7f02d87d..e772a238 100644 --- a/network/pool.go +++ b/network/pool.go @@ -7,15 +7,18 @@ import ( "github.com/rs/zerolog" ) +type Callback func(key, value interface{}) bool + type Pool interface { - ForEach(callback func(client *Client) error) + ForEach(Callback) Pool() *sync.Map - ClientIDs() []string - Put(client *Client) error - Pop(ID string) *Client + // ClientIDs() []string + Put(key, value interface{}) + Pop(key interface{}) interface{} Size() int - Close() error - Shutdown() + Clear() + // Close() error + // Shutdown() } type PoolImpl struct { @@ -25,50 +28,23 @@ type PoolImpl struct { var _ Pool = &PoolImpl{} -func (p *PoolImpl) ForEach(callback func(client *Client) error) { - p.pool.Range(func(key, value interface{}) bool { - if c, ok := value.(*Client); ok { - err := callback(c) - if err != nil { - p.logger.Debug().Err(err).Msg("an error occurred running the callback") - } - return true - } - - return false - }) +func (p *PoolImpl) ForEach(cb Callback) { + p.pool.Range(cb) } func (p *PoolImpl) Pool() *sync.Map { return &p.pool } -func (p *PoolImpl) ClientIDs() []string { - var ids []string - p.pool.Range(func(key, _ interface{}) bool { - if id, ok := key.(string); ok { - ids = append(ids, id) - return true - } - return false - }) - return ids +func (p *PoolImpl) Put(key, value interface{}) { + p.pool.Store(key, value) + p.logger.Debug().Msg("Item has been put on the pool") } -func (p *PoolImpl) Put(client *Client) error { - p.pool.Store(client.ID, client) - p.logger.Debug().Msgf("Client %s has been put on the pool", client.ID) - - return nil -} - -func (p *PoolImpl) Pop(id string) *Client { - if client, ok := p.pool.LoadAndDelete(id); ok { - p.logger.Debug().Msgf("Client %s has been popped from the pool", id) - if c, ok := client.(*Client); ok { - return c - } - return nil +func (p *PoolImpl) Pop(key interface{}) interface{} { + if value, ok := p.pool.LoadAndDelete(key); ok { + p.logger.Debug().Msg("Item has been popped from the pool") + return value } return nil @@ -84,25 +60,14 @@ func (p *PoolImpl) Size() int { return size } -func (p *PoolImpl) Close() error { - p.ForEach(func(client *Client) error { - client.Close() - return nil - }) - - return nil +func (p *PoolImpl) Clear() { + p.pool = sync.Map{} } -func (p *PoolImpl) Shutdown() { - p.pool.Range(func(key, value interface{}) bool { - if cl, ok := value.(*Client); ok { - cl.Close() - } - p.pool.Delete(key) - return true - }) - - p.pool = sync.Map{} +func NewEmptyPool(logger zerolog.Logger) Pool { + return &PoolImpl{ + logger: logger, + } } func NewPool( @@ -130,15 +95,13 @@ func NewPool( } if client != nil { - if err := pool.Put(client); err != nil { - logger.Panic().Err(err).Msg("Failed to add client to pool") - } + pool.Put(client.ID, client) } } // Verify that the pool is properly populated - logger.Info().Msgf("There are %d clients in the pool", len(pool.ClientIDs())) - if len(pool.ClientIDs()) != poolSize { + logger.Info().Msgf("There are %d clients in the pool", pool.Size()) + if pool.Size() != poolSize { logger.Error().Msg( "The pool size is incorrect, either because " + "the clients are cannot connect (no network connectivity) " + diff --git a/network/proxy.go b/network/proxy.go index 87c683fe..3a86565b 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "sync" "github.com/panjf2000/gnet/v2" "github.com/rs/zerolog" @@ -18,13 +17,12 @@ type Proxy interface { PassThrough(gconn gnet.Conn, onIncomingTraffic, onOutgoingTraffic map[Prio]Traffic) error Reconnect(cl *Client) *Client Shutdown() - Size() int } type ProxyImpl struct { - pool Pool - clients sync.Map - logger zerolog.Logger + availableConnections Pool + busyConnections Pool + logger zerolog.Logger Elastic bool ReuseElasticClients bool @@ -38,21 +36,24 @@ var _ Proxy = &ProxyImpl{} func NewProxy( pool Pool, elastic, reuseElasticClients bool, clientConfig *Client, logger zerolog.Logger, ) *ProxyImpl { - proxy := ProxyImpl{ - clients: sync.Map{}, - logger: logger, - Elastic: elastic, - ReuseElasticClients: reuseElasticClients, - ClientConfig: clientConfig, + return &ProxyImpl{ + availableConnections: pool, + busyConnections: NewEmptyPool(logger), + logger: logger, + Elastic: elastic, + ReuseElasticClients: reuseElasticClients, + ClientConfig: clientConfig, } - - proxy.pool = pool - - return &proxy } func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { - clientIDs := pr.pool.ClientIDs() + var clientIDs []string + pr.availableConnections.ForEach(func(key, _ interface{}) bool { + if clientID, ok := key.(string); ok { + clientIDs = append(clientIDs, clientID) + } + return true + }) var client *Client if len(clientIDs) == 0 { @@ -72,47 +73,43 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { } else { // Get a client from the pool pr.logger.Debug().Msgf("Available clients: %v", len(clientIDs)) - client = pr.pool.Pop(clientIDs[0]) + if cl, ok := pr.availableConnections.Pop(clientIDs[0]).(*Client); !ok { + client = cl + } } if client.ID != "" { - pr.clients.Store(gconn, client) + pr.busyConnections.Put(gconn, client) pr.logger.Debug().Msgf("Client %s has been assigned to %s", client.ID, gconn.RemoteAddr().String()) } else { return ErrClientNotConnected } - pr.logger.Debug().Msgf("[C] There are %d clients in the pool", len(pr.pool.ClientIDs())) - pr.logger.Debug().Msgf("[C] There are %d clients in use", pr.Size()) + pr.logger.Debug().Msgf("[C] There are %d clients in the pool", pr.availableConnections.Size()) + pr.logger.Debug().Msgf("[C] There are %d clients in use", pr.busyConnections.Size()) return nil } func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { var client *Client - if cl, ok := pr.clients.Load(gconn); ok { - if c, ok := cl.(*Client); ok { - client = c - } + if cl, ok := pr.busyConnections.Pop(gconn).(*Client); !ok { + client = cl } - pr.clients.Delete(gconn) // TODO: The connection is unstable when I put the client back in the pool // If the client is not in the pool, put it back if pr.Elastic && pr.ReuseElasticClients || !pr.Elastic { client = pr.Reconnect(client) if client != nil && client.ID != "" { - if err := pr.pool.Put(client); err != nil { - pr.logger.Error().Err(err).Msgf("Failed to put the client %s back in the pool", client.ID) - return fmt.Errorf("failed to put the client back in the pool: %w", err) - } + pr.availableConnections.Put(client.ID, client) } } else { client.Close() } - pr.logger.Debug().Msgf("[D] There are %d clients in the pool", len(pr.pool.ClientIDs())) - pr.logger.Debug().Msgf("[D] There are %d clients in use", pr.Size()) + pr.logger.Debug().Msgf("[D] There are %d clients in the pool", pr.availableConnections.Size()) + pr.logger.Debug().Msgf("[D] There are %d clients in use", pr.busyConnections.Size()) return nil } @@ -130,10 +127,8 @@ func (pr *ProxyImpl) PassThrough( // that listens for data from the server and sends it to the client var client *Client - if c, ok := pr.clients.Load(gconn); ok { - if cl, ok := c.(*Client); ok { - client = cl - } + if cl, ok := pr.busyConnections.Pop(gconn).(*Client); ok { + client = cl } else { return ErrClientNotFound } @@ -175,8 +170,8 @@ func (pr *ProxyImpl) PassThrough( // server forceful closed the connection // Reconnect the client client = pr.Reconnect(client) - // Store the client in the map, replacing the old one - pr.clients.Store(gconn, client) + // Put the client in the busy connections pool, effectively replacing the old one + pr.busyConnections.Put(gconn, client) return err } @@ -212,23 +207,24 @@ func (pr *ProxyImpl) Reconnect(cl *Client) *Client { } func (pr *ProxyImpl) Shutdown() { - pr.pool.Shutdown() - pr.logger.Debug().Msg("All busy client connections have been closed") - - availableClients := pr.pool.ClientIDs() - for _, clientID := range availableClients { - client := pr.pool.Pop(clientID) - client.Close() - } - pr.logger.Debug().Msg("All available client connections have been closed") -} - -func (pr *ProxyImpl) Size() int { - var size int - pr.clients.Range(func(_, _ interface{}) bool { - size++ + pr.availableConnections.ForEach(func(key, value interface{}) bool { + if cl, ok := value.(*Client); ok { + cl.Close() + } return true }) + pr.availableConnections.Clear() + pr.logger.Debug().Msg("All available connections have been closed") - return size + pr.busyConnections.ForEach(func(key, value interface{}) bool { + if gconn, ok := key.(gnet.Conn); ok { + gconn.Close() + } + if cl, ok := value.(*Client); ok { + cl.Close() + } + return true + }) + pr.busyConnections.Clear() + pr.logger.Debug().Msg("All busy connections have been closed") } From a0171845d798e04aa542f339051cdc88f6b23fb0 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 19 Nov 2022 23:17:32 +0100 Subject: [PATCH 3/6] Update tests to reflect changes to making the pool generic --- network/pool_test.go | 96 +++++++++++++++++++----------------------- network/proxy_test.go | 17 ++++---- network/server_test.go | 8 ++-- 3 files changed, 58 insertions(+), 63 deletions(-) diff --git a/network/pool_test.go b/network/pool_test.go index c765625f..383203a2 100644 --- a/network/pool_test.go +++ b/network/pool_test.go @@ -19,24 +19,13 @@ func TestNewPool(t *testing.T) { logger := logging.NewLogger(cfg) pool := NewPool(logger, 0, nil, nil) - defer pool.Close() + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) } -func TestPool_Put(t *testing.T) { - postgres := embeddedpostgres.NewDatabase() - if err := postgres.Start(); err != nil { - t.Fatal(err) - } - - defer func() { - if err := postgres.Stop(); err != nil { - t.Fatal(err) - } - }() - +func TestNewEmptyPool(t *testing.T) { cfg := logging.LoggerConfig{ Output: nil, TimeFormat: zerolog.TimeFormatUnix, @@ -45,19 +34,14 @@ func TestPool_Put(t *testing.T) { } logger := logging.NewLogger(cfg) - - pool := NewPool(logger, 0, nil, nil) - defer pool.Close() + pool := NewEmptyPool(logger) + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize, logger))) - assert.Equal(t, 1, pool.Size()) - assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize, logger))) - assert.Equal(t, 2, pool.Size()) } -func TestPool_Pop(t *testing.T) { +func TestPool_Put(t *testing.T) { postgres := embeddedpostgres.NewDatabase() if err := postgres.Start(); err != nil { t.Fatal(err) @@ -79,25 +63,19 @@ func TestPool_Pop(t *testing.T) { logger := logging.NewLogger(cfg) pool := NewPool(logger, 0, nil, nil) - defer pool.Close() + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client1)) + pool.Put(client1.ID, client1) assert.Equal(t, 1, pool.Size()) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client2)) + pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) - client := pool.Pop(client1.ID) - assert.Equal(t, client1.ID, client.ID) - assert.Equal(t, 1, pool.Size()) - client = pool.Pop(client2.ID) - assert.Equal(t, client2.ID, client.ID) - assert.Equal(t, 0, pool.Size()) } -func TestPool_Close(t *testing.T) { +func TestPool_Pop(t *testing.T) { postgres := embeddedpostgres.NewDatabase() if err := postgres.Start(); err != nil { t.Fatal(err) @@ -119,21 +97,25 @@ func TestPool_Close(t *testing.T) { logger := logging.NewLogger(cfg) pool := NewPool(logger, 0, nil, nil) + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client1)) + pool.Put(client1.ID, client1) assert.Equal(t, 1, pool.Size()) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client2)) - assert.Equal(t, 2, pool.Size()) - err := pool.Close() - assert.Nil(t, err) + pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) + client := pool.Pop(client1.ID).(*Client) + assert.Equal(t, client1.ID, client.ID) + assert.Equal(t, 1, pool.Size()) + client = pool.Pop(client2.ID).(*Client) + assert.Equal(t, client2.ID, client.ID) + assert.Equal(t, 0, pool.Size()) } -func TestPool_Shutdown(t *testing.T) { +func TestPool_Clear(t *testing.T) { postgres := embeddedpostgres.NewDatabase() if err := postgres.Start(); err != nil { t.Fatal(err) @@ -155,17 +137,17 @@ func TestPool_Shutdown(t *testing.T) { logger := logging.NewLogger(cfg) pool := NewPool(logger, 0, nil, nil) - defer pool.Close() + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client1)) + pool.Put(client1.ID, client1) assert.Equal(t, 1, pool.Size()) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client2)) + pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) - pool.Shutdown() + pool.Clear() assert.Equal(t, 0, pool.Size()) } @@ -191,23 +173,23 @@ func TestPool_ForEach(t *testing.T) { logger := logging.NewLogger(cfg) pool := NewPool(logger, 0, nil, nil) - defer pool.Close() + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client1)) + pool.Put(client1.ID, client1) assert.Equal(t, 1, pool.Size()) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client2)) + pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) - pool.ForEach(func(client *Client) error { - assert.NotNil(t, client) - return nil + pool.ForEach(func(key, value interface{}) bool { + assert.NotNil(t, value.(*Client)) + return true }) } -func TestPool_ClientIDs(t *testing.T) { +func TestPool_GetClientIDs(t *testing.T) { postgres := embeddedpostgres.NewDatabase() if err := postgres.Start(); err != nil { t.Fatal(err) @@ -229,16 +211,26 @@ func TestPool_ClientIDs(t *testing.T) { logger := logging.NewLogger(cfg) pool := NewPool(logger, 0, nil, nil) - defer pool.Close() + defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client1)) + pool.Put(client1.ID, client1) assert.Equal(t, 1, pool.Size()) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - assert.NoError(t, pool.Put(client2)) + pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) - ids := pool.ClientIDs() + + var ids []string + pool.ForEach(func(key, value interface{}) bool { + ids = append(ids, key.(string)) + return true + }) assert.Equal(t, 2, len(ids)) + assert.Equal(t, client1.ID, ids[0]) + assert.Equal(t, client2.ID, ids[1]) + client1.Close() + client2.Close() + pool.Clear() } diff --git a/network/proxy_test.go b/network/proxy_test.go index 76bb8626..8d2476dc 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -32,19 +32,20 @@ func TestNewProxy(t *testing.T) { // Create a connection pool pool := NewPool(logger, 0, nil, nil) - assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize, logger))) + client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + pool.Put(client.ID, client) // Create a proxy with a fixed buffer pool proxy := NewProxy(pool, false, false, nil, logger) assert.NotNil(t, proxy) - assert.Equal(t, 0, proxy.Size(), "Proxy should have no connected clients") - assert.Equal(t, 1, len(proxy.pool.ClientIDs())) - assert.NotEqual(t, "", proxy.pool.ClientIDs()[0]) + assert.Equal(t, 0, proxy.busyConnections.Size(), "Proxy should have no connected clients") + assert.Equal(t, 1, proxy.availableConnections.Size()) + assert.NotEqual(t, client.ID, proxy.availableConnections.Pop(client.ID).(*Client).ID) assert.Equal(t, false, proxy.Elastic) assert.Equal(t, false, proxy.ReuseElasticClients) - proxy.pool.Close() + proxy.availableConnections.Clear() } func TestNewProxyElastic(t *testing.T) { @@ -68,13 +69,13 @@ func TestNewProxyElastic(t *testing.T) { }, logger) assert.NotNil(t, proxy) - assert.Equal(t, 0, proxy.Size()) - assert.Equal(t, 0, len(proxy.pool.ClientIDs())) + assert.Equal(t, 0, proxy.busyConnections.Size()) + assert.Equal(t, 0, proxy.availableConnections.Size()) assert.Equal(t, true, proxy.Elastic) assert.Equal(t, false, proxy.ReuseElasticClients) assert.Equal(t, "tcp", proxy.ClientConfig.Network) assert.Equal(t, "localhost:5432", proxy.ClientConfig.Address) assert.Equal(t, DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize) - proxy.pool.Close() + proxy.availableConnections.Clear() } diff --git a/network/server_test.go b/network/server_test.go index 9965339f..8982f989 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -47,9 +47,11 @@ func TestRunServer(t *testing.T) { hooksConfig.AddHook(OnOutgoingTraffic, 1, onOutgoingTraffic) // Create a connection pool - pool := NewPool(logger, 0, nil, nil) - assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize, logger))) - assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize, logger))) + pool := NewEmptyPool(logger) + client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + pool.Put(client1.ID, client1) + client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + pool.Put(client2.ID, client2) // Create a proxy with a fixed buffer pool proxy := NewProxy(pool, false, false, &Client{ From 0ec97241029db5c91ce3638f368d6f962a9c4c3e Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 19 Nov 2022 23:55:14 +0100 Subject: [PATCH 4/6] Fix linter errors --- network/pool_test.go | 26 ++++++++++++++++++-------- network/proxy_test.go | 4 +++- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/network/pool_test.go b/network/pool_test.go index 383203a2..33b39dc0 100644 --- a/network/pool_test.go +++ b/network/pool_test.go @@ -107,12 +107,18 @@ func TestPool_Pop(t *testing.T) { client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) - client := pool.Pop(client1.ID).(*Client) - assert.Equal(t, client1.ID, client.ID) - assert.Equal(t, 1, pool.Size()) - client = pool.Pop(client2.ID).(*Client) - assert.Equal(t, client2.ID, client.ID) - assert.Equal(t, 0, pool.Size()) + if c1, ok := pool.Pop(client1.ID).(*Client); !ok { + assert.Equal(t, c1, client1) + } else { + assert.Equal(t, client1.ID, c1.ID) + assert.Equal(t, 1, pool.Size()) + } + if c2, ok := pool.Pop(client2.ID).(*Client); !ok { + assert.Equal(t, c2, client2) + } else { + assert.Equal(t, client2.ID, c2.ID) + assert.Equal(t, 0, pool.Size()) + } } func TestPool_Clear(t *testing.T) { @@ -184,7 +190,9 @@ func TestPool_ForEach(t *testing.T) { pool.Put(client2.ID, client2) assert.Equal(t, 2, pool.Size()) pool.ForEach(func(key, value interface{}) bool { - assert.NotNil(t, value.(*Client)) + if c, ok := value.(*Client); ok { + assert.NotNil(t, c) + } return true }) } @@ -224,7 +232,9 @@ func TestPool_GetClientIDs(t *testing.T) { var ids []string pool.ForEach(func(key, value interface{}) bool { - ids = append(ids, key.(string)) + if id, ok := key.(string); ok { + ids = append(ids, id) + } return true }) assert.Equal(t, 2, len(ids)) diff --git a/network/proxy_test.go b/network/proxy_test.go index 8d2476dc..11d7a2d0 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -41,7 +41,9 @@ func TestNewProxy(t *testing.T) { assert.NotNil(t, proxy) assert.Equal(t, 0, proxy.busyConnections.Size(), "Proxy should have no connected clients") assert.Equal(t, 1, proxy.availableConnections.Size()) - assert.NotEqual(t, client.ID, proxy.availableConnections.Pop(client.ID).(*Client).ID) + if c, ok := proxy.availableConnections.Pop(client.ID).(*Client); ok { + assert.NotEqual(t, "", c.ID) + } assert.Equal(t, false, proxy.Elastic) assert.Equal(t, false, proxy.ReuseElasticClients) From 7097ab19b9b8c7fc72f9a6ca71dcf68a21c08399 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 19 Nov 2022 23:59:39 +0100 Subject: [PATCH 5/6] Fix test when checking for a value in an array --- network/pool_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/pool_test.go b/network/pool_test.go index 33b39dc0..158a8286 100644 --- a/network/pool_test.go +++ b/network/pool_test.go @@ -238,8 +238,8 @@ func TestPool_GetClientIDs(t *testing.T) { return true }) assert.Equal(t, 2, len(ids)) - assert.Equal(t, client1.ID, ids[0]) - assert.Equal(t, client2.ID, ids[1]) + assert.Contains(t, client1.ID, ids[0]) + assert.Contains(t, client2.ID, ids[1]) client1.Close() client2.Close() pool.Clear() From 32dcb9d8f554fb3fbd5e78e6538f08f96bd4da37 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 20 Nov 2022 00:10:53 +0100 Subject: [PATCH 6/6] Fix bug in proxy.Connect --- network/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/proxy.go b/network/proxy.go index 3a86565b..0462f32d 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -73,7 +73,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { } else { // Get a client from the pool pr.logger.Debug().Msgf("Available clients: %v", len(clientIDs)) - if cl, ok := pr.availableConnections.Pop(clientIDs[0]).(*Client); !ok { + if cl, ok := pr.availableConnections.Pop(clientIDs[0]).(*Client); ok { client = cl } }