Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cmd

import (
"os"

"github.com/spf13/cobra"
)

var rootCmd = &cobra.Command{
Use: "gatewayd",
Short: "A cloud-native database gateway and framework for building data-driven applications",
Long: "GatewayD is a cloud-native database gateway and framework for building data-driven " +
"applications. It sits in between your database(s) and your database client(s) and " +
"proxies all queries to and their responses from the database.",
}

func Execute() {
if err := rootCmd.Execute(); err != nil {
os.Exit(1)
}
}

func init() {}
136 changes: 136 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package cmd

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/gatewayd-io/gatewayd/logging"
"github.com/gatewayd-io/gatewayd/network"
"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
)

const (
DefaultTCPKeepAlive = 3 * time.Second
)

// runCmd represents the run command.
var runCmd = &cobra.Command{
Use: "run",
Short: "Run a gatewayd instance",
Run: func(cmd *cobra.Command, args []string) {
// Create a logger
logger := logging.NewLogger(nil, zerolog.TimeFormatUnix, zerolog.InfoLevel, true)

// Create a pool
pool := network.NewPool(logger)

// Add a client to the pool
for i := 0; i < network.DefaultPoolSize; i++ {
client := network.NewClient("tcp", "localhost:5432", network.DefaultBufferSize, logger)
if client != nil {
if err := pool.Put(client); err != nil {
logger.Panic().Err(err).Msg("Failed to add client to pool")
}
}
}

// Verify that the pool is properly populated
logger.Debug().Msgf("There are %d clients in the pool", len(pool.ClientIDs()))
if len(pool.ClientIDs()) != network.DefaultPoolSize {
logger.Error().Msg(
"The pool size is incorrect, either because " +
"the clients are cannot connect (no network connectivity) " +
"or the server is not running. Exiting...")
os.Exit(1)
}

// Create a prefork proxy with the pool of clients
proxy := network.NewProxy(pool, false, false, &network.Client{
Network: "tcp",
Address: "localhost:5432",
ReceiveBufferSize: network.DefaultBufferSize,
}, logger)

// Create a server
server := network.NewServer(
"tcp",
"0.0.0.0:15432",
0,
0,
network.DefaultTickInterval,
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(true),
gnet.WithLockOSThread(false),
// NumEventLoop overrides Multicore option.
// gnet.WithNumEventLoop(1),

// Can be used to send keepalive messages to the client.
gnet.WithTicker(false),

// Internal event-loop load balancing options
gnet.WithLoadBalancing(gnet.RoundRobin),

// Logger options
// TODO: This is a temporary solution and will be replaced.
// gnet.WithLogger(logrus.New()),
// gnet.WithLogPath("./gnet.log"),
// gnet.WithLogLevel(zapcore.DebugLevel),

// Buffer options
// TODO: This should be configurable and optimized.
gnet.WithReadBufferCap(network.DefaultBufferSize),
gnet.WithWriteBufferCap(network.DefaultBufferSize),
gnet.WithSocketRecvBuffer(network.DefaultBufferSize),
gnet.WithSocketSendBuffer(network.DefaultBufferSize),

// TCP options
gnet.WithReuseAddr(true),
gnet.WithReusePort(true),
gnet.WithTCPKeepAlive(DefaultTCPKeepAlive),
gnet.WithTCPNoDelay(gnet.TCPNoDelay),
},
nil,
nil,
proxy,
logger,
)

// Shutdown the server gracefully
var signals []os.Signal
signals = append(signals,
os.Interrupt,
os.Kill,
syscall.SIGTERM,
syscall.SIGABRT,
syscall.SIGQUIT,
syscall.SIGHUP,
syscall.SIGINT,
)
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, signals...)
go func() {
for sig := range signalsCh {
for _, s := range signals {
if sig != s {
server.Shutdown()
os.Exit(0)
}
}
}
}()

// Run the server
if err := server.Run(); err != nil {
logger.Error().Err(err).Msg("Failed to start server")
}
},
}

func init() {
rootCmd.AddCommand(runCmd)
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ require (
github.com/fergusstrange/embedded-postgres v1.19.0
github.com/panjf2000/gnet/v2 v2.1.2
github.com/rs/zerolog v1.28.0
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fergusstrange/embedded-postgres v1.19.0 h1:NqDufJHeA03U7biULlPHZ0pZ10/mDOMKPILEpT50Fyk=
github.com/fergusstrange/embedded-postgres v1.19.0/go.mod h1:0B+3bPsMvcNgR9nN+bdM2x9YaNYDnf3ksUqYp1OAub0=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -35,6 +38,11 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY=
github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
124 changes: 2 additions & 122 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,127 +1,7 @@
package main

import (
"os"
"os/signal"
"syscall"
"time"
import "github.com/gatewayd-io/gatewayd/cmd"

"github.com/gatewayd-io/gatewayd/logging"
"github.com/gatewayd-io/gatewayd/network"
"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
)

const (
DefaultTCPKeepAlive = 3 * time.Second
)

//nolint:funlen
func main() {
// Create a logger
logger := logging.NewLogger(nil, zerolog.TimeFormatUnix, zerolog.InfoLevel, true)

// Create a pool
pool := network.NewPool(logger)

// Add a client to the pool
for i := 0; i < network.DefaultPoolSize; i++ {
client := network.NewClient("tcp", "localhost:5432", network.DefaultBufferSize, logger)
if client != nil {
if err := pool.Put(client); err != nil {
logger.Panic().Err(err).Msg("Failed to add client to pool")
}
}
}

// Verify that the pool is properly populated
logger.Debug().Msgf("There are %d clients in the pool", len(pool.ClientIDs()))
if len(pool.ClientIDs()) != network.DefaultPoolSize {
logger.Error().Msg(
"The pool size is incorrect, either because " +
"the clients are cannot connect (no network connectivity) " +
"or the server is not running. Exiting...")
os.Exit(1)
}

// Create a prefork proxy with the pool of clients
proxy := network.NewProxy(pool, false, false, &network.Client{
Network: "tcp",
Address: "localhost:5432",
ReceiveBufferSize: network.DefaultBufferSize,
}, logger)

// Create a server
server := network.NewServer(
"tcp",
"0.0.0.0:15432",
0,
0,
network.DefaultTickInterval,
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(true),
gnet.WithLockOSThread(false),
// NumEventLoop overrides Multicore option.
// gnet.WithNumEventLoop(1),

// Can be used to send keepalive messages to the client.
gnet.WithTicker(false),

// Internal event-loop load balancing options
gnet.WithLoadBalancing(gnet.RoundRobin),

// Logger options
// TODO: This is a temporary solution and will be replaced.
// gnet.WithLogger(logrus.New()),
// gnet.WithLogPath("./gnet.log"),
// gnet.WithLogLevel(zapcore.DebugLevel),

// Buffer options
// TODO: This should be configurable and optimized.
gnet.WithReadBufferCap(network.DefaultBufferSize),
gnet.WithWriteBufferCap(network.DefaultBufferSize),
gnet.WithSocketRecvBuffer(network.DefaultBufferSize),
gnet.WithSocketSendBuffer(network.DefaultBufferSize),

// TCP options
gnet.WithReuseAddr(true),
gnet.WithReusePort(true),
gnet.WithTCPKeepAlive(DefaultTCPKeepAlive),
gnet.WithTCPNoDelay(gnet.TCPNoDelay),
},
nil,
nil,
proxy,
logger,
)

// Shutdown the server gracefully
var signals []os.Signal
signals = append(signals,
os.Interrupt,
os.Kill,
syscall.SIGTERM,
syscall.SIGABRT,
syscall.SIGQUIT,
syscall.SIGHUP,
syscall.SIGINT,
)
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, signals...)
go func() {
for sig := range signalsCh {
for _, s := range signals {
if sig != s {
server.Shutdown()
os.Exit(0)
}
}
}
}()

// Run the server
if err := server.Run(); err != nil {
logger.Error().Err(err).Msg("Failed to start server")
}
cmd.Execute()
}