Skip to content

Commit f1c0924

Browse files
committed
Implement reconnect mechanism on the proxy
Move SQL commit closer to the insert command Use executor.submit instead of map
1 parent fec295b commit f1c0924

File tree

3 files changed

+33
-17
lines changed

3 files changed

+33
-17
lines changed

client_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ def worker(id):
1414
conn.execute(
1515
"CREATE TABLE IF NOT EXISTS test (id serial PRIMARY KEY, num integer, data varchar);")
1616
conn.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (id, "abc'def"))
17+
conn.execute("COMMIT")
1718

1819
for row in conn.execute("SELECT * FROM test;"):
1920
print("ID=%s, NUM=%s, DATA=%s" % row)
2021

2122
# conn.execute("DROP TABLE test;")
22-
conn.execute("COMMIT")
2323
conn.close()
2424
except KeyboardInterrupt:
2525
if conn:
@@ -32,7 +32,8 @@ def worker(id):
3232

3333

3434
if __name__ == '__main__':
35-
with ThreadPoolExecutor(max_workers=2) as executor:
35+
with ThreadPoolExecutor(max_workers=10) as executor:
3636
# Create 11 connections to the server and run queries in parallel
3737
# This will cause the server to crash
38-
executor.map(worker, range(1, 12, 1))
38+
for i in range(11):
39+
executor.submit(worker, i)

network/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type Client struct {
2222
// unexpected EOF on client connection with an open transaction
2323

2424
func NewClient(network, address string, receiveBufferSize int) *Client {
25+
// TODO: Resolve the address and return an error if it can't be resolved
26+
2527
c := Client{
2628
Network: network,
2729
Address: address,

network/proxy.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type Proxy interface {
1313
Connect(c gnet.Conn) error
1414
Disconnect(c gnet.Conn) error
1515
PassThrough(c gnet.Conn) error
16+
Reconnect(cl *Client) *Client
1617
Shutdown()
1718
Size() int
1819
}
@@ -92,25 +93,23 @@ func (pr *ProxyImpl) Connect(c gnet.Conn) error {
9293

9394
func (pr *ProxyImpl) Disconnect(c gnet.Conn) error {
9495
var client *Client
95-
if c, ok := pr.connClients.Load(c); ok {
96-
client = c.(*Client)
96+
if cl, ok := pr.connClients.Load(c); ok {
97+
client = cl.(*Client)
9798
}
9899
pr.connClients.Delete(c)
99100

100101
// TODO: The connection is unstable when I put the client back in the pool
101102
// If the client is not in the pool, put it back
102-
if client != nil && client.ID != "" {
103-
if pr.Elastic && pr.ReuseElasticClients {
103+
104+
if pr.Elastic && pr.ReuseElasticClients || !pr.Elastic {
105+
client = pr.Reconnect(client)
106+
if client != nil && client.ID != "" {
104107
if err := pr.pool.Put(client); err != nil {
105108
return err
106109
}
107-
} else {
108-
// FIXME: Close the client connection, as it is not reusable???
109-
pr.pool.Put(client)
110-
// pr.pool.Put(NewClient("tcp", "localhost:5432", 4096))
111110
}
112111
} else {
113-
return errors.New("client is not connected (disconnect)")
112+
client.Close()
114113
}
115114

116115
logrus.Infof("[D] There are %d clients in the pool", len(pr.pool.ClientIDs()))
@@ -144,11 +143,17 @@ func (pr *ProxyImpl) PassThrough(c gnet.Conn) error {
144143
// Receive the response from the server
145144
size, response, err := client.Receive()
146145
if err != nil {
147-
return err
148-
}
149-
150-
if size == 0 {
151-
return errors.New("no response from the server")
146+
// FIXME: Is this the right way to handle this error?
147+
if err.Error() == "EOF" {
148+
logrus.Error("The client is not connected to the server anymore")
149+
// Either the client is not connected to the server anymore or
150+
// server forceful closed the connection
151+
// Reconnect the client
152+
client = pr.Reconnect(client)
153+
// Store the client in the map, replacing the old one
154+
pr.connClients.Store(c, client)
155+
}
156+
// return err
152157
}
153158

154159
// Write the response to the incoming connection
@@ -157,6 +162,14 @@ func (pr *ProxyImpl) PassThrough(c gnet.Conn) error {
157162
return nil
158163
}
159164

165+
func (pr *ProxyImpl) Reconnect(cl *Client) *Client {
166+
// Close the client
167+
if cl != nil && cl.ID != "" {
168+
cl.Close()
169+
}
170+
return NewClient("tcp", "localhost:5432", 4096)
171+
}
172+
160173
func (pr *ProxyImpl) Shutdown() {
161174
pr.pool.Shutdown()
162175
logrus.Info("All busy client connections have been closed")

0 commit comments

Comments
 (0)