Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,43 @@ const (

//nolint:funlen
func main() {
// Create a pool
pool := network.NewPool()

// Add a client to the pool
for i := 0; i < network.DefaultPoolSize; i++ {
client := network.NewClient("tcp", "localhost:5432", network.DefaultBufferSize)
if client != nil {
if err := pool.Put(client); err != nil {
logrus.Panic(err)
}
}
}

// Verify that the pool is properly populated
logrus.Infof("There are %d clients in the pool", len(pool.ClientIDs()))
if len(pool.ClientIDs()) != network.DefaultPoolSize {
logrus.Error(
"The pool size is incorrect, either because " +
"the clients are cannot connect (no network connectivity) " +
"or the server is not running. Exiting...")
os.Exit(1)
}

// Create a prefork proxy with the pool of clients
proxy := network.NewProxy(pool, false, false, &network.Client{
Network: "tcp",
Address: "localhost:5432",
ReceiveBufferSize: network.DefaultBufferSize,
})

// Create a server
server := network.NewServer(
"tcp",
"0.0.0.0:15432",
0,
0,
network.DefaultTickInterval,
network.DefaultPoolSize,
network.DefaultBufferSize,
false,
false,
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(true),
Expand Down Expand Up @@ -62,6 +88,7 @@ func main() {
},
nil,
nil,
proxy,
)

// Shutdown the server gracefully
Expand Down
72 changes: 30 additions & 42 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"os"
"sync"

"github.com/panjf2000/gnet/v2"
Expand All @@ -23,51 +22,32 @@ type Proxy interface {
}

type ProxyImpl struct {
pool Pool
connClients sync.Map
pool Pool
clients sync.Map

PoolSize int
Elastic bool
ReuseElasticClients bool
BufferSize int

// ClientConfig is used for elastic proxy and reconnection
ClientConfig *Client
}

var _ Proxy = &ProxyImpl{}

func NewProxy(size, bufferSize int, elastic, reuseElasticClients bool) *ProxyImpl {
func NewProxy(
pool Pool, elastic, reuseElasticClients bool, clientConfig *Client,
) *ProxyImpl {
proxy := ProxyImpl{
pool: NewPool(),
connClients: sync.Map{},

PoolSize: size,
clients: sync.Map{},
Elastic: elastic,
ReuseElasticClients: reuseElasticClients,
ClientConfig: clientConfig,
}

if proxy.Elastic {
return &proxy
}

if bufferSize == 0 {
proxy.BufferSize = DefaultBufferSize
}

for i := 0; i < size; i++ {
client := NewClient("tcp", "localhost:5432", proxy.BufferSize)
if client != nil {
if err := proxy.pool.Put(client); err != nil {
logrus.Panic(err)
}
}
}

logrus.Infof("There are %d clients in the pool", len(proxy.pool.ClientIDs()))
if len(proxy.pool.ClientIDs()) != size {
logrus.Error(
"The pool size is incorrect, either because " +
"the clients are cannot connect (no network connectivity) " +
"or the server is not running")
os.Exit(1)
if pool != nil {
proxy.pool = pool
} else {
proxy.pool = NewPool()
}

return &proxy
Expand All @@ -81,7 +61,11 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error {
// Pool is exhausted
if pr.Elastic {
// Create a new client
client = NewClient("tcp", "localhost:5432", pr.BufferSize)
client = NewClient(
pr.ClientConfig.Network,
pr.ClientConfig.Address,
pr.ClientConfig.ReceiveBufferSize,
)
logrus.Debugf("Reused the client %s by putting it back in the pool", client.ID)
} else {
return ErrPoolExhausted
Expand All @@ -93,7 +77,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error {
}

if client.ID != "" {
pr.connClients.Store(gconn, client)
pr.clients.Store(gconn, client)
logrus.Debugf("Client %s has been assigned to %s", client.ID, gconn.RemoteAddr().String())
} else {
return ErrClientNotConnected
Expand All @@ -107,12 +91,12 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error {

func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error {
var client *Client
if cl, ok := pr.connClients.Load(gconn); ok {
if cl, ok := pr.clients.Load(gconn); ok {
if c, ok := cl.(*Client); ok {
client = c
}
}
pr.connClients.Delete(gconn)
pr.clients.Delete(gconn)

// TODO: The connection is unstable when I put the client back in the pool
// If the client is not in the pool, put it back
Expand Down Expand Up @@ -142,7 +126,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn, onIncomingTraffic, onOutgoingT
// that listens for data from the server and sends it to the client

var client *Client
if c, ok := pr.connClients.Load(gconn); ok {
if c, ok := pr.clients.Load(gconn); ok {
if cl, ok := c.(*Client); ok {
client = cl
}
Expand Down Expand Up @@ -184,7 +168,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn, onIncomingTraffic, onOutgoingT
// Reconnect the client
client = pr.Reconnect(client)
// Store the client in the map, replacing the old one
pr.connClients.Store(gconn, client)
pr.clients.Store(gconn, client)
return err
}

Expand All @@ -211,7 +195,11 @@ func (pr *ProxyImpl) Reconnect(cl *Client) *Client {
if cl != nil && cl.ID != "" {
cl.Close()
}
return NewClient("tcp", "localhost:5432", pr.BufferSize)
return NewClient(
pr.ClientConfig.Network,
pr.ClientConfig.Address,
pr.ClientConfig.ReceiveBufferSize,
)
}

func (pr *ProxyImpl) Shutdown() {
Expand All @@ -228,7 +216,7 @@ func (pr *ProxyImpl) Shutdown() {

func (pr *ProxyImpl) Size() int {
var size int
pr.connClients.Range(func(_, _ interface{}) bool {
pr.clients.Range(func(_, _ interface{}) bool {
size++
return true
})
Expand Down
39 changes: 33 additions & 6 deletions network/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,65 @@ func TestNewProxy(t *testing.T) {
}
}()

proxy := NewProxy(1, DefaultBufferSize, false, false)
// Create a connection pool
pool := NewPool()
assert.NoError(t, pool.Put(NewClient("tcp", "localhost:5432", DefaultBufferSize)))

// Create a proxy with a fixed buffer pool
proxy := NewProxy(pool, false, false, nil)

assert.NotNil(t, proxy)
assert.Equal(t, 0, proxy.Size(), "Proxy should have no connected clients")
assert.Equal(t, 1, len(proxy.pool.ClientIDs()))
assert.NotEqual(t, "", proxy.pool.ClientIDs()[0])
assert.Equal(t, 1, proxy.PoolSize)
assert.Equal(t, false, proxy.Elastic)
assert.Equal(t, false, proxy.ReuseElasticClients)

proxy.pool.Close()
}

func TestNewProxyElastic(t *testing.T) {
proxy := NewProxy(1, DefaultBufferSize, true, false)
// Create a connection pool
pool := NewPool()

// Create a proxy with an elastic buffer pool
proxy := NewProxy(pool, true, false, &Client{
Network: "tcp",
Address: "localhost:5432",
ReceiveBufferSize: DefaultBufferSize,
})

assert.NotNil(t, proxy)
assert.Equal(t, 0, proxy.Size())
assert.Equal(t, 0, len(proxy.pool.ClientIDs()))
assert.Equal(t, 1, proxy.PoolSize)
assert.Equal(t, true, proxy.Elastic)
assert.Equal(t, false, proxy.ReuseElasticClients)
assert.Equal(t, "tcp", proxy.ClientConfig.Network)
assert.Equal(t, "localhost:5432", proxy.ClientConfig.Address)
assert.Equal(t, DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize)

proxy.pool.Close()
}

func TestNewProxyElasticReuse(t *testing.T) {
proxy := NewProxy(1, DefaultBufferSize, true, true)
// Create a connection pool
pool := NewPool()

// Create a proxy with an elastic buffer pool
proxy := NewProxy(pool, true, true, &Client{
Network: "tcp",
Address: "localhost:5432",
ReceiveBufferSize: DefaultBufferSize,
})

assert.NotNil(t, proxy)
assert.Equal(t, 0, proxy.Size())
assert.Equal(t, 0, len(proxy.pool.ClientIDs()))
assert.Equal(t, 1, proxy.PoolSize)
assert.Equal(t, true, proxy.Elastic)
assert.Equal(t, true, proxy.ReuseElasticClients)
assert.Equal(t, "tcp", proxy.ClientConfig.Network)
assert.Equal(t, "localhost:5432", proxy.ClientConfig.Address)
assert.Equal(t, DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize)

proxy.pool.Close()
}
51 changes: 18 additions & 33 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ type Server struct {
engine gnet.Engine
proxy Proxy

Network string // tcp/udp/unix
Address string
Options []gnet.Option
SoftLimit uint64
HardLimit uint64
Status Status
TickInterval int
PoolSize int
ElasticPool bool
ReuseElasticClients bool
BufferSize int
OnIncomingTraffic Traffic
OnOutgoingTraffic Traffic
Network string // tcp/udp/unix
Address string
Options []gnet.Option
SoftLimit uint64
HardLimit uint64
Status Status
TickInterval int
// PoolSize int
// ElasticPool bool
// ReuseElasticClients bool
// BufferSize int
OnIncomingTraffic Traffic
OnOutgoingTraffic Traffic
}

func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
s.engine = engine

// Create a proxy with a fixed/elastic buffer pool
s.proxy = NewProxy(s.PoolSize, s.BufferSize, s.ElasticPool, s.ReuseElasticClients)
// s.proxy = NewProxy(s.PoolSize, s.BufferSize, s.ElasticPool, s.ReuseElasticClients)

// Set the status to running
s.Status = Running
Expand Down Expand Up @@ -141,10 +141,10 @@ func (s *Server) IsRunning() bool {
func NewServer(
network, address string,
softLimit, hardLimit uint64,
tickInterval, poolSize, bufferSize int,
elasticPool, reuseElasticClients bool,
tickInterval int,
options []gnet.Option,
onIncomingTraffic, onOutgoingTraffic Traffic,
proxy Proxy,
) *Server {
server := Server{
Network: network,
Expand Down Expand Up @@ -194,23 +194,6 @@ func NewServer(
server.TickInterval = tickInterval
}

if poolSize == 0 {
server.PoolSize = DefaultPoolSize
logrus.Debugf("Client connections is not set, using the default value")
} else {
server.PoolSize = poolSize
}

if bufferSize == 0 {
server.BufferSize = DefaultBufferSize
logrus.Debugf("Buffer size is not set, using the default value")
} else {
server.BufferSize = bufferSize
}

server.ElasticPool = elasticPool
server.ReuseElasticClients = reuseElasticClients

if onIncomingTraffic == nil {
server.OnIncomingTraffic = func(gconn gnet.Conn, cl *Client, buf []byte, err error) error {
// TODO: Implement the traffic handler
Expand All @@ -231,5 +214,7 @@ func NewServer(
server.OnOutgoingTraffic = onOutgoingTraffic
}

server.proxy = proxy

return &server
}
Loading