Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions cmd/config_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package cmd

import (
"os"
"time"

"github.com/gatewayd-io/gatewayd/logging"
"github.com/gatewayd-io/gatewayd/network"
"github.com/knadh/koanf"
"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
)

// Global koanf instance. Using "." as the key path delimiter.
var konfig = koanf.New(".")

func getPath(path string) string {
ref := konfig.String(path)
if konfig.Exists(path) && konfig.StringMap(ref) != nil {
return ref
}

return path
}

// func resolvePath(path string) map[string]string {
// ref := getPath(path)
// if ref != path {
// return konfig.StringMap(ref)
// }
// return nil
// }

func loggerConfig() logging.LoggerConfig {
cfg := logging.LoggerConfig{}
switch konfig.String("loggers.logger.output") {
case "stdout":
cfg.Output = os.Stdout
case "console":
default:
cfg.Output = nil
}

switch konfig.String("loggers.logger.timeFormat") {
case "unixms":
cfg.TimeFormat = zerolog.TimeFormatUnixMs
case "unixmicro":
cfg.TimeFormat = zerolog.TimeFormatUnixMicro
case "unixnano":
cfg.TimeFormat = zerolog.TimeFormatUnixNano
case "unix":
cfg.TimeFormat = zerolog.TimeFormatUnix
default:
cfg.TimeFormat = zerolog.TimeFormatUnix
}

switch konfig.String("loggers.logger.level") {
case "debug":
cfg.Level = zerolog.DebugLevel
case "info":
cfg.Level = zerolog.InfoLevel
case "warn":
cfg.Level = zerolog.WarnLevel
case "error":
cfg.Level = zerolog.ErrorLevel
case "fatal":
cfg.Level = zerolog.FatalLevel
case "panic":
cfg.Level = zerolog.PanicLevel
case "disabled":
cfg.Level = zerolog.Disabled
case "trace":
cfg.Level = zerolog.TraceLevel
default:
cfg.Level = zerolog.InfoLevel
}

cfg.NoColor = konfig.Bool("loggers.logger.noColor")

return cfg
}

func poolConfig() (int, *network.Client) {
poolSize := konfig.Int("pool.size")
if poolSize == 0 {
poolSize = network.DefaultPoolSize
}

ref := getPath("pool.client")
net := konfig.String(ref + ".network")
address := konfig.String(ref + ".address")
receiveBufferSize := konfig.Int(ref + ".receiveBufferSize")

return poolSize, &network.Client{
Network: net,
Address: address,
ReceiveBufferSize: receiveBufferSize,
}
}

func proxyConfig() (bool, bool, *network.Client) {
elastic := konfig.Bool("proxy.elastic")
reuseElasticClients := konfig.Bool("proxy.reuseElasticClients")

ref := getPath("pool.client")
net := konfig.String(ref + ".network")
address := konfig.String(ref + ".address")
receiveBufferSize := konfig.Int(ref + ".receiveBufferSize")

return elastic, reuseElasticClients, &network.Client{
Network: net,
Address: address,
ReceiveBufferSize: receiveBufferSize,
}
}

type ServerConfig struct {
Network string
Address string
SoftLimit uint64
HardLimit uint64
EnableTicker bool
MultiCore bool
LockOSThread bool
ReuseAddress bool
ReusePort bool
LoadBalancer gnet.LoadBalancing
TickInterval int
ReadBufferCap int
WriteBufferCap int
SocketRecvBuffer int
SocketSendBuffer int
TCPKeepAlive time.Duration
TCPNoDelay gnet.TCPSocketOpt
// OnIncomingTraffic string
// OnOutgoingTraffic string
}

var loadBalancer = map[string]gnet.LoadBalancing{
"roundrobin": gnet.RoundRobin,
"leastconnections": gnet.LeastConnections,
"sourceaddrhash": gnet.SourceAddrHash,
}

func getLoadBalancer(name string) gnet.LoadBalancing {
if lb, ok := loadBalancer[name]; ok {
return lb
}

return gnet.RoundRobin
}

func getTCPNoDelay() gnet.TCPSocketOpt {
if konfig.Bool("server.tcpNoDelay") {
return gnet.TCPNoDelay
}

return gnet.TCPDelay
}

func serverConfig() *ServerConfig {
return &ServerConfig{
Network: konfig.String("server.network"),
Address: konfig.String("server.address"),
SoftLimit: uint64(konfig.Int64("server.softLimit")),
HardLimit: uint64(konfig.Int64("server.hardLimit")),
EnableTicker: konfig.Bool("server.enableTicker"),
TickInterval: konfig.Int("server.tickInterval"),
MultiCore: konfig.Bool("server.multiCore"),
LockOSThread: konfig.Bool("server.lockOSThread"),
LoadBalancer: getLoadBalancer(konfig.String("server.loadBalancer")),
ReadBufferCap: konfig.Int("server.readBufferCap"),
WriteBufferCap: konfig.Int("server.writeBufferCap"),
SocketRecvBuffer: konfig.Int("server.socketRecvBuffer"),
SocketSendBuffer: konfig.Int("server.socketSendBuffer"),
ReuseAddress: konfig.Bool("server.reuseAddress"),
ReusePort: konfig.Bool("server.reusePort"),
TCPKeepAlive: konfig.Duration("server.tcpKeepAlive"),
TCPNoDelay: getTCPNoDelay(),
// OnIncomingTraffic: konfig.String("server.onIncomingTraffic"),
// OnOutgoingTraffic: konfig.String("server.onOutgoingTraffic"),
}
}
2 changes: 0 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,3 @@ func Execute() {
os.Exit(1)
}
}

func init() {}
81 changes: 36 additions & 45 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,60 @@ import (

"github.com/gatewayd-io/gatewayd/logging"
"github.com/gatewayd-io/gatewayd/network"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/file"
"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
)

const (
DefaultTCPKeepAlive = 3 * time.Second
)

var configFile string

// runCmd represents the run command.
var runCmd = &cobra.Command{
Use: "run",
Short: "Run a gatewayd instance",
Run: func(cmd *cobra.Command, args []string) {
// Create a logger
logger := logging.NewLogger(nil, zerolog.TimeFormatUnix, zerolog.InfoLevel, true)

// Create a pool
pool := network.NewPool(logger)

// Add a client to the pool
for i := 0; i < network.DefaultPoolSize; i++ {
client := network.NewClient("tcp", "localhost:5432", network.DefaultBufferSize, logger)
if client != nil {
if err := pool.Put(client); err != nil {
logger.Panic().Err(err).Msg("Failed to add client to pool")
}
if f, err := cmd.Flags().GetString("config"); err == nil {
if err := konfig.Load(file.Provider(f), yaml.Parser()); err != nil {
panic(err)
}
}

// Verify that the pool is properly populated
logger.Debug().Msgf("There are %d clients in the pool", len(pool.ClientIDs()))
if len(pool.ClientIDs()) != network.DefaultPoolSize {
logger.Error().Msg(
"The pool size is incorrect, either because " +
"the clients are cannot connect (no network connectivity) " +
"or the server is not running. Exiting...")
os.Exit(1)
}
// Create a new logger from the config
logger := logging.NewLogger(loggerConfig())

// Create and initialize a pool of connections
poolSize, poolClientConfig := poolConfig()
pool := network.NewPool(logger, poolSize, poolClientConfig)

// Create a prefork proxy with the pool of clients
proxy := network.NewProxy(pool, false, false, &network.Client{
Network: "tcp",
Address: "localhost:5432",
ReceiveBufferSize: network.DefaultBufferSize,
}, logger)
elastic, reuseElasticClients, elasticClientConfig := proxyConfig()
proxy := network.NewProxy(pool, elastic, reuseElasticClients, elasticClientConfig, logger)

// Create a server
serverConfig := serverConfig()
server := network.NewServer(
"tcp",
"0.0.0.0:15432",
0,
0,
network.DefaultTickInterval,
serverConfig.Network,
serverConfig.Address,
serverConfig.SoftLimit,
serverConfig.HardLimit,
serverConfig.TickInterval,
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(true),
gnet.WithLockOSThread(false),
gnet.WithMulticore(serverConfig.MultiCore),
gnet.WithLockOSThread(serverConfig.LockOSThread),
// NumEventLoop overrides Multicore option.
// gnet.WithNumEventLoop(1),

// Can be used to send keepalive messages to the client.
gnet.WithTicker(false),
gnet.WithTicker(serverConfig.EnableTicker),

// Internal event-loop load balancing options
gnet.WithLoadBalancing(gnet.RoundRobin),
gnet.WithLoadBalancing(serverConfig.LoadBalancer),

// Logger options
// TODO: This is a temporary solution and will be replaced.
Expand All @@ -83,16 +71,16 @@ var runCmd = &cobra.Command{

// Buffer options
// TODO: This should be configurable and optimized.
gnet.WithReadBufferCap(network.DefaultBufferSize),
gnet.WithWriteBufferCap(network.DefaultBufferSize),
gnet.WithSocketRecvBuffer(network.DefaultBufferSize),
gnet.WithSocketSendBuffer(network.DefaultBufferSize),
gnet.WithReadBufferCap(serverConfig.ReadBufferCap),
gnet.WithWriteBufferCap(serverConfig.WriteBufferCap),
gnet.WithSocketRecvBuffer(serverConfig.SocketRecvBuffer),
gnet.WithSocketSendBuffer(serverConfig.SocketSendBuffer),

// TCP options
gnet.WithReuseAddr(true),
gnet.WithReusePort(true),
gnet.WithTCPKeepAlive(DefaultTCPKeepAlive),
gnet.WithTCPNoDelay(gnet.TCPNoDelay),
gnet.WithReuseAddr(serverConfig.ReuseAddress),
gnet.WithReusePort(serverConfig.ReusePort),
gnet.WithTCPKeepAlive(serverConfig.TCPKeepAlive),
gnet.WithTCPNoDelay(serverConfig.TCPNoDelay),
},
nil,
nil,
Expand Down Expand Up @@ -133,4 +121,7 @@ var runCmd = &cobra.Command{

func init() {
rootCmd.AddCommand(runCmd)

runCmd.PersistentFlags().StringVarP(
&configFile, "config", "c", "./gatewayd.yaml", "config file (default is ./gatewayd.yaml)")
}
65 changes: 65 additions & 0 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Gatewayd config
# This file is used to configure the gatewayd daemon.

# Loggers config
loggers:
logger:
output: "null" # stdout or file or console
# Implementing file output
# file : ./logs/gatewayd.log
level: "debug" # panic, fatal, error, warn, info, debug, trace
noColor: True
timeFormat: "unix"

clients:
client1:
network: tcp
address: localhost:5432
receiveBufferSize: 4096

# Pool config
pool:
# Use the logger config passed here
# i.e. don't assume it's the same as the logger config above
logger: loggers.logger
size: 10
# Database configs for the connection pool
client: clients.client1

# Proxy config
proxy:
# Use the logger config passed here
# i.e. don't assume it's the same as the logger config above
logger: loggers.logger
# Use the pool config passed here
# i.e. don't assume it's the same as the pool config above
pool: pool
elastic: False
reuseElasticClients: False
elasticClient: clients.client1

server:
network: tcp
address: 0.0.0.0:15432
# softLimit: 0
# hardLimit: 0

logger: loggers.logger
proxy: proxy

enableTicker: False
tickInterval: 5 # seconds
multiCore: True
lockOSThread: False
loadBalancer: roundrobin
readBufferCap: 4096
writeBufferCap: 4096
socketRecvBuffer: 4096
socketSendBuffer: 4096
reuseAddress: True
reusePort: True
tcpKeepAlive: 3 # seconds
tcpNoDelay: True

incomingTrafficHandler: "null"
outgoingTrafficHandler: "null"
Loading