Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,71 @@ jobs:
# uses: shogo82148/actions-goveralls@v1
# with:
# path-to-profile: profile.cov

test-plugins:
name: Test GatewayD Plugins
runs-on: ubuntu-latest
services:
postgres:
image: postgres
env:
POSTGRES_HOST: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout 🛎️
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Checkout test plugin 🛎️
uses: actions/checkout@v3
with:
repository: gatewayd-io/gatewayd-plugin-test
path: gatewayd-plugin-test
token: ${{ secrets.GH_PLUGIN_TOKEN }}

- name: Install Go 🧑‍💻
uses: actions/setup-go@v3
with:
go-version: '1.18'

- name: Build test plugin 🏗️
run: |
# Build GatewayD
make build
# Build test plugin
cd gatewayd-plugin-test && make build && cp gatewayd-plugin-test ../gdp-test && cd ..
export SHA256SUM=$(sha256sum gdp-test | awk '{print $1}')
cat <<EOF > gatewayd_plugins.yaml
gatewayd-plugin-test:
enabled: True
localPath: ./gdp-test
checksum: ${SHA256SUM}
EOF

- name: Run GatewayD with test plugin 🚀
run: |
./gatewayd run &

- name: Run a test with PSQL 🧪
run: |
sudo apt-get update
sudo apt-get install --yes --no-install-recommends postgresql-client
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -c "CREATE DATABASE gatewayd_test;"
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));"
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "INSERT INTO test_table (name) VALUES ('test');"
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "SELECT * FROM test_table;" | grep test
env:
DBNAME: gatewayd_test
PGUSER: postgres
PGPASSWORD: postgres
PGHOST: localhost
PGPORT: 15432
20 changes: 12 additions & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ var runCmd = &cobra.Command{
// This is a notification hook, so we don't care about the result.
data, err := structpb.NewStruct(map[string]interface{}{
"timeFormat": loggerCfg.TimeFormat,
"level": loggerCfg.Level,
"output": loggerCfg.Output,
"level": loggerCfg.Level.String(),
"noColor": loggerCfg.NoColor,
})
if err != nil {
Expand Down Expand Up @@ -179,7 +178,11 @@ var runCmd = &cobra.Command{
proxyCfg, err := structpb.NewStruct(map[string]interface{}{
"elastic": elastic,
"reuseElasticClients": reuseElasticClients,
"clientConfig": elasticClientConfig,
"clientConfig": map[string]interface{}{
"network": elasticClientConfig.Network,
"address": elasticClientConfig.Address,
"receiveBufferSize": elasticClientConfig.ReceiveBufferSize,
},
})
if err != nil {
logger.Error().Err(err).Msg("Failed to convert proxy config to structpb")
Expand Down Expand Up @@ -240,19 +243,19 @@ var runCmd = &cobra.Command{
"address": serverConfig.Address,
"softLimit": serverConfig.SoftLimit,
"hardLimit": serverConfig.HardLimit,
"tickInterval": serverConfig.TickInterval,
"tickInterval": serverConfig.TickInterval.Seconds(),
"multiCore": serverConfig.MultiCore,
"lockOSThread": serverConfig.LockOSThread,
"enableTicker": serverConfig.EnableTicker,
"loadBalancer": serverConfig.LoadBalancer,
"loadBalancer": int(serverConfig.LoadBalancer),
"readBufferCap": serverConfig.ReadBufferCap,
"writeBufferCap": serverConfig.WriteBufferCap,
"socketRecvBuffer": serverConfig.SocketRecvBuffer,
"socketSendBuffer": serverConfig.SocketSendBuffer,
"reuseAddress": serverConfig.ReuseAddress,
"reusePort": serverConfig.ReusePort,
"tcpKeepAlive": serverConfig.TCPKeepAlive,
"tcpNoDelay": serverConfig.TCPNoDelay,
"tcpKeepAlive": serverConfig.TCPKeepAlive.Seconds(),
"tcpNoDelay": int(serverConfig.TCPNoDelay),
})
if err != nil {
logger.Error().Err(err).Msg("Failed to convert server config to structpb")
Expand Down Expand Up @@ -281,7 +284,8 @@ var runCmd = &cobra.Command{
for _, s := range signals {
if sig != s {
// Notify the hooks that the server is shutting down
signalCfg, err := structpb.NewStruct(map[string]interface{}{"signal": sig})
signalCfg, err := structpb.NewStruct(
map[string]interface{}{"signal": sig.String()})
if err != nil {
logger.Error().Err(err).Msg(
"Failed to convert signal config to structpb")
Expand Down
51 changes: 51 additions & 0 deletions logging/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package logging

import (
"bytes"
"encoding/json"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)

func TestNewLogger(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(
LoggerConfig{
Output: &buffer,
Level: zerolog.DebugLevel,
TimeFormat: zerolog.TimeFormatUnix,
NoColor: true,
},
)
assert.NotNil(t, logger)

var msg interface{}
err := json.Unmarshal(buffer.Bytes(), &msg)
assert.NoError(t, err)

if jsonMsg, ok := msg.(map[string]interface{}); ok {
// This is created when the logger is created and
// is used to test that the logger is working.
assert.Equal(t, "Created a new logger", jsonMsg["message"])
assert.Equal(t, "debug", jsonMsg["level"])
} else {
t.Fail()
}

buffer.Reset()

logger.Error().Str("key", "key").Msg("This is an error")
var msg2 interface{}
err = json.Unmarshal(buffer.Bytes(), &msg2)
assert.NoError(t, err)

if jsonMsg, ok := msg2.(map[string]interface{}); ok {
assert.Equal(t, "This is an error", jsonMsg["message"])
assert.Equal(t, "error", jsonMsg["level"])
assert.Equal(t, "key", jsonMsg["key"])
} else {
t.Fail()
}
}
46 changes: 26 additions & 20 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
s.logger.Debug().Msg("GatewayD is booting...")

onBootingData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"engine": engine,
"status": string(s.Status),
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -63,8 +62,7 @@ func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
s.Status = Running

onBootedData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"engine": engine,
"status": string(s.Status),
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -85,8 +83,10 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) {
s.logger.Debug().Msgf("GatewayD is opening a connection from %s", gconn.RemoteAddr().String())

onOpeningData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"gconn": gconn,
"client": map[string]interface{}{
"local": gconn.LocalAddr().String(),
"remote": gconn.RemoteAddr().String(),
},
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand Down Expand Up @@ -117,8 +117,10 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) {
}

onOpenedData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"gconn": gconn,
"client": map[string]interface{}{
"local": gconn.LocalAddr().String(),
"remote": gconn.RemoteAddr().String(),
},
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -137,9 +139,11 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action {
s.logger.Debug().Msgf("GatewayD is closing a connection from %s", gconn.RemoteAddr().String())

onClosingData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"gconn": gconn,
"error": err,
"client": map[string]interface{}{
"local": gconn.LocalAddr().String(),
"remote": gconn.RemoteAddr().String(),
},
"error": err,
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -160,9 +164,11 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action {
}

onClosedData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"gconn": gconn,
"error": err,
"client": map[string]interface{}{
"local": gconn.LocalAddr().String(),
"remote": gconn.RemoteAddr().String(),
},
"error": err,
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -179,8 +185,10 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action {

func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action {
onTrafficData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"gconn": gconn,
"client": map[string]interface{}{
"local": gconn.LocalAddr().String(),
"remote": gconn.RemoteAddr().String(),
},
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -205,8 +213,7 @@ func (s *Server) OnShutdown(engine gnet.Engine) {
s.logger.Debug().Msg("GatewayD is shutting down...")

onShutdownData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"engine": engine,
"connections": s.engine.CountConnections(),
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -227,7 +234,7 @@ func (s *Server) OnTick() (time.Duration, gnet.Action) {
s.logger.Info().Msgf("Active connections: %d", s.engine.CountConnections())

onTickData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"connections": s.engine.CountConnections(),
})
if err != nil {
s.logger.Error().Err(err).Msg("Failed to create structpb")
Expand All @@ -254,7 +261,6 @@ func (s *Server) Run() error {
// Since gnet.Run is blocking, we need to run OnRun before it
//nolint:nestif
if onRunData, err := structpb.NewStruct(map[string]interface{}{
"server": s,
"address": addr,
"error": err,
}); err != nil {
Expand Down
59 changes: 0 additions & 59 deletions plugin/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,65 +77,6 @@ func Test_HookConfig_Run(t *testing.T) {
assert.Nil(t, err)
}

func Test_Verify(t *testing.T) {
params, err := structpb.NewStruct(
map[string]interface{}{
"test": "test",
},
)
assert.Nil(t, err)

returnVal, err := structpb.NewStruct(
map[string]interface{}{
"test": "test",
},
)
assert.Nil(t, err)

assert.True(t, Verify(params, returnVal))
}

func Test_Verify_fail(t *testing.T) {
data := [][]map[string]interface{}{
{
{
"test": "test",
},
{
"test": "test",
"test2": "test2",
},
},
{
{
"test": "test",
"test2": "test2",
},
{
"test": "test",
},
},
{
{
"test": "test",
"test2": "test2",
},
{
"test": "test",
"test3": "test3",
},
},
}

for _, d := range data {
params, err := structpb.NewStruct(d[0])
assert.Nil(t, err)
returnVal, err := structpb.NewStruct(d[1])
assert.Nil(t, err)
assert.False(t, Verify(params, returnVal))
}
}

func Test_HookConfig_Run_PassDown(t *testing.T) {
hooks := NewHookConfig()
// The result of the hook will be nil and will be passed down to the next hook.
Expand Down
12 changes: 9 additions & 3 deletions plugin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (reg *RegistryImpl) LoadPlugins(pluginConfig *koanf.Koanf) {
Managed: true,
MinPort: DefaultMinPort,
MaxPort: DefaultMaxPort,
// TODO: Enable GRPC DialOptions
// GRPCDialOptions: []grpc.DialOption{
// grpc.WithInsecure(),
// },
Expand Down Expand Up @@ -189,9 +190,14 @@ func (reg *RegistryImpl) LoadPlugins(pluginConfig *koanf.Koanf) {
&plugin.Hooks); err != nil {
reg.hooksConfig.Logger.Debug().Err(err).Msg("Failed to decode plugin hooks")
}
if err := mapstructure.Decode(metadata.Fields["config"].GetStructValue().AsMap(),
&plugin.Config); err != nil {
reg.hooksConfig.Logger.Debug().Err(err).Msg("Failed to decode plugin config")

plugin.Config = make(map[string]string)
for key, value := range metadata.Fields["config"].GetStructValue().AsMap() {
if val, ok := value.(string); ok {
plugin.Config[key] = val
} else {
reg.hooksConfig.Logger.Debug().Msgf("Failed to decode plugin config: %s", key)
}
}

reg.Add(plugin)
Expand Down
Loading