diff --git a/main.go b/main.go index 07834cbf..0a535f57 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,36 @@ const ( //nolint:funlen func main() { + // Create a pool + pool := network.NewPool() + + // Add a client to the pool + for i := 0; i < network.DefaultPoolSize; i++ { + client := network.NewClient("tcp", "localhost:5432", network.DefaultBufferSize) + if client != nil { + if err := pool.Put(client); err != nil { + logrus.Panic(err) + } + } + } + + // Verify that the pool is properly populated + logrus.Infof("There are %d clients in the pool", len(pool.ClientIDs())) + if len(pool.ClientIDs()) != network.DefaultPoolSize { + logrus.Error( + "The pool size is incorrect, either because " + + "the clients are cannot connect (no network connectivity) " + + "or the server is not running. Exiting...") + os.Exit(1) + } + + // Create a prefork proxy with the pool of clients + proxy := network.NewProxy(pool, false, false, &network.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: network.DefaultBufferSize, + }) + // Create a server server := network.NewServer( "tcp", @@ -24,10 +54,6 @@ func main() { 0, 0, network.DefaultTickInterval, - network.DefaultPoolSize, - network.DefaultBufferSize, - false, - false, []gnet.Option{ // Scheduling options gnet.WithMulticore(true), @@ -62,6 +88,7 @@ func main() { }, nil, nil, + proxy, ) // Shutdown the server gracefully diff --git a/network/proxy.go b/network/proxy.go index deb279ea..260cfa80 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "os" "sync" "github.com/panjf2000/gnet/v2" @@ -23,51 +22,32 @@ type Proxy interface { } type ProxyImpl struct { - pool Pool - connClients sync.Map + pool Pool + clients sync.Map - PoolSize int Elastic bool ReuseElasticClients bool - BufferSize int + + // ClientConfig is used for elastic proxy and reconnection + ClientConfig *Client } var _ Proxy = &ProxyImpl{} -func NewProxy(size, bufferSize int, elastic, reuseElasticClients bool) *ProxyImpl { +func NewProxy( + pool Pool, elastic, reuseElasticClients bool, clientConfig *Client, +) *ProxyImpl { proxy := ProxyImpl{ - pool: NewPool(), - connClients: sync.Map{}, - - PoolSize: size, + clients: sync.Map{}, Elastic: elastic, ReuseElasticClients: reuseElasticClients, + ClientConfig: clientConfig, } - if proxy.Elastic { - return &proxy - } - - if bufferSize == 0 { - proxy.BufferSize = DefaultBufferSize - } - - for i := 0; i < size; i++ { - client := NewClient("tcp", "localhost:5432", proxy.BufferSize) - if client != nil { - if err := proxy.pool.Put(client); err != nil { - logrus.Panic(err) - } - } - } - - logrus.Infof("There are %d clients in the pool", len(proxy.pool.ClientIDs())) - if len(proxy.pool.ClientIDs()) != size { - logrus.Error( - "The pool size is incorrect, either because " + - "the clients are cannot connect (no network connectivity) " + - "or the server is not running") - os.Exit(1) + if pool != nil { + proxy.pool = pool + } else { + proxy.pool = NewPool() } return &proxy @@ -81,7 +61,11 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { // Pool is exhausted if pr.Elastic { // Create a new client - client = NewClient("tcp", "localhost:5432", pr.BufferSize) + client = NewClient( + pr.ClientConfig.Network, + pr.ClientConfig.Address, + pr.ClientConfig.ReceiveBufferSize, + ) logrus.Debugf("Reused the client %s by putting it back in the pool", client.ID) } else { return ErrPoolExhausted @@ -93,7 +77,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { } if client.ID != "" { - pr.connClients.Store(gconn, client) + pr.clients.Store(gconn, client) logrus.Debugf("Client %s has been assigned to %s", client.ID, gconn.RemoteAddr().String()) } else { return ErrClientNotConnected @@ -107,12 +91,12 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { var client *Client - if cl, ok := pr.connClients.Load(gconn); ok { + if cl, ok := pr.clients.Load(gconn); ok { if c, ok := cl.(*Client); ok { client = c } } - pr.connClients.Delete(gconn) + pr.clients.Delete(gconn) // TODO: The connection is unstable when I put the client back in the pool // If the client is not in the pool, put it back @@ -142,7 +126,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn, onIncomingTraffic, onOutgoingT // that listens for data from the server and sends it to the client var client *Client - if c, ok := pr.connClients.Load(gconn); ok { + if c, ok := pr.clients.Load(gconn); ok { if cl, ok := c.(*Client); ok { client = cl } @@ -184,7 +168,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn, onIncomingTraffic, onOutgoingT // Reconnect the client client = pr.Reconnect(client) // Store the client in the map, replacing the old one - pr.connClients.Store(gconn, client) + pr.clients.Store(gconn, client) return err } @@ -211,7 +195,11 @@ func (pr *ProxyImpl) Reconnect(cl *Client) *Client { if cl != nil && cl.ID != "" { cl.Close() } - return NewClient("tcp", "localhost:5432", pr.BufferSize) + return NewClient( + pr.ClientConfig.Network, + pr.ClientConfig.Address, + pr.ClientConfig.ReceiveBufferSize, + ) } func (pr *ProxyImpl) Shutdown() { @@ -228,7 +216,7 @@ func (pr *ProxyImpl) Shutdown() { func (pr *ProxyImpl) Size() int { var size int - pr.connClients.Range(func(_, _ interface{}) bool { + pr.clients.Range(func(_, _ interface{}) bool { size++ return true }) diff --git a/network/proxy_test.go b/network/proxy_test.go index 700bfe43..485ec29a 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -19,12 +19,17 @@ func TestNewProxy(t *testing.T) { } }() - proxy := NewProxy(1, DefaultBufferSize, false, false) + // Create a connection pool + pool := NewPool() + assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize))) + + // Create a proxy with a fixed buffer pool + proxy := NewProxy(pool, false, false, nil) + assert.NotNil(t, proxy) assert.Equal(t, 0, proxy.Size(), "Proxy should have no connected clients") assert.Equal(t, 1, len(proxy.pool.ClientIDs())) assert.NotEqual(t, "", proxy.pool.ClientIDs()[0]) - assert.Equal(t, 1, proxy.PoolSize) assert.Equal(t, false, proxy.Elastic) assert.Equal(t, false, proxy.ReuseElasticClients) @@ -32,25 +37,47 @@ func TestNewProxy(t *testing.T) { } func TestNewProxyElastic(t *testing.T) { - proxy := NewProxy(1, DefaultBufferSize, true, false) + // Create a connection pool + pool := NewPool() + + // Create a proxy with an elastic buffer pool + proxy := NewProxy(pool, true, false, &Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: DefaultBufferSize, + }) + assert.NotNil(t, proxy) assert.Equal(t, 0, proxy.Size()) assert.Equal(t, 0, len(proxy.pool.ClientIDs())) - assert.Equal(t, 1, proxy.PoolSize) assert.Equal(t, true, proxy.Elastic) assert.Equal(t, false, proxy.ReuseElasticClients) + assert.Equal(t, "tcp", proxy.ClientConfig.Network) + assert.Equal(t, "localhost:5432", proxy.ClientConfig.Address) + assert.Equal(t, DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize) proxy.pool.Close() } func TestNewProxyElasticReuse(t *testing.T) { - proxy := NewProxy(1, DefaultBufferSize, true, true) + // Create a connection pool + pool := NewPool() + + // Create a proxy with an elastic buffer pool + proxy := NewProxy(pool, true, true, &Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: DefaultBufferSize, + }) + assert.NotNil(t, proxy) assert.Equal(t, 0, proxy.Size()) assert.Equal(t, 0, len(proxy.pool.ClientIDs())) - assert.Equal(t, 1, proxy.PoolSize) assert.Equal(t, true, proxy.Elastic) assert.Equal(t, true, proxy.ReuseElasticClients) + assert.Equal(t, "tcp", proxy.ClientConfig.Network) + assert.Equal(t, "localhost:5432", proxy.ClientConfig.Address) + assert.Equal(t, DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize) proxy.pool.Close() } diff --git a/network/server.go b/network/server.go index 8f2e4135..cc0ffd8e 100644 --- a/network/server.go +++ b/network/server.go @@ -25,26 +25,26 @@ type Server struct { engine gnet.Engine proxy Proxy - Network string // tcp/udp/unix - Address string - Options []gnet.Option - SoftLimit uint64 - HardLimit uint64 - Status Status - TickInterval int - PoolSize int - ElasticPool bool - ReuseElasticClients bool - BufferSize int - OnIncomingTraffic Traffic - OnOutgoingTraffic Traffic + Network string // tcp/udp/unix + Address string + Options []gnet.Option + SoftLimit uint64 + HardLimit uint64 + Status Status + TickInterval int + // PoolSize int + // ElasticPool bool + // ReuseElasticClients bool + // BufferSize int + OnIncomingTraffic Traffic + OnOutgoingTraffic Traffic } func (s *Server) OnBoot(engine gnet.Engine) gnet.Action { s.engine = engine // Create a proxy with a fixed/elastic buffer pool - s.proxy = NewProxy(s.PoolSize, s.BufferSize, s.ElasticPool, s.ReuseElasticClients) + // s.proxy = NewProxy(s.PoolSize, s.BufferSize, s.ElasticPool, s.ReuseElasticClients) // Set the status to running s.Status = Running @@ -141,10 +141,10 @@ func (s *Server) IsRunning() bool { func NewServer( network, address string, softLimit, hardLimit uint64, - tickInterval, poolSize, bufferSize int, - elasticPool, reuseElasticClients bool, + tickInterval int, options []gnet.Option, onIncomingTraffic, onOutgoingTraffic Traffic, + proxy Proxy, ) *Server { server := Server{ Network: network, @@ -194,23 +194,6 @@ func NewServer( server.TickInterval = tickInterval } - if poolSize == 0 { - server.PoolSize = DefaultPoolSize - logrus.Debugf("Client connections is not set, using the default value") - } else { - server.PoolSize = poolSize - } - - if bufferSize == 0 { - server.BufferSize = DefaultBufferSize - logrus.Debugf("Buffer size is not set, using the default value") - } else { - server.BufferSize = bufferSize - } - - server.ElasticPool = elasticPool - server.ReuseElasticClients = reuseElasticClients - if onIncomingTraffic == nil { server.OnIncomingTraffic = func(gconn gnet.Conn, cl *Client, buf []byte, err error) error { // TODO: Implement the traffic handler @@ -231,5 +214,7 @@ func NewServer( server.OnOutgoingTraffic = onOutgoingTraffic } + server.proxy = proxy + return &server } diff --git a/network/server_test.go b/network/server_test.go index 24805ddc..ba88e754 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -31,6 +31,18 @@ func TestRunServer(t *testing.T) { return nil } + // Create a connection pool + pool := NewPool() + assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize))) + assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize))) + + // Create a proxy with a fixed buffer pool + proxy := NewProxy(pool, false, false, &Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: DefaultBufferSize, + }) + // Create a server server := NewServer( "tcp", @@ -38,15 +50,12 @@ func TestRunServer(t *testing.T) { 0, 0, DefaultTickInterval, - 2, - DefaultBufferSize, - false, - false, []gnet.Option{ gnet.WithMulticore(true), }, onIncomingTraffic, onOutgoingTraffic, + proxy, ) assert.NotNil(t, server)