1
1
package cmd
2
2
3
3
import (
4
+ "context"
4
5
"os"
5
6
"os/signal"
6
7
"syscall"
7
8
"time"
8
9
10
+ gerr "github.com/gatewayd-io/gatewayd/errors"
9
11
"github.com/gatewayd-io/gatewayd/logging"
10
12
"github.com/gatewayd-io/gatewayd/network"
11
13
"github.com/gatewayd-io/gatewayd/plugin"
12
14
"github.com/gatewayd-io/gatewayd/pool"
13
- "github.com/knadh/koanf "
15
+ goplugin "github.com/hashicorp/go-plugin "
14
16
"github.com/knadh/koanf/parsers/yaml"
15
17
"github.com/knadh/koanf/providers/confmap"
16
18
"github.com/knadh/koanf/providers/file"
17
19
"github.com/panjf2000/gnet/v2"
20
+ "github.com/rs/zerolog"
18
21
"github.com/spf13/cobra"
22
+ "google.golang.org/protobuf/types/known/structpb"
19
23
)
20
24
21
25
const (
22
26
DefaultTCPKeepAlive = 3 * time .Second
23
27
)
24
28
25
29
var (
26
- configFile string
27
- hooksConfig = plugin .NewHookConfig ()
30
+ globalConfigFile string
31
+ pluginConfigFile string
32
+ )
33
+
34
+ var (
35
+ hooksConfig = plugin .NewHookConfig ()
36
+ DefaultLogger = logging .NewLogger (logging.LoggerConfig {Level : zerolog .DebugLevel })
37
+ pluginRegistry = plugin .NewRegistry (hooksConfig )
28
38
)
29
39
30
40
// runCmd represents the run command.
31
41
var runCmd = & cobra.Command {
32
42
Use : "run" ,
33
43
Short : "Run a gatewayd instance" ,
34
44
Run : func (cmd * cobra.Command , args []string ) {
45
+ // The plugins are loaded and hooks registered
46
+ // before the configuration is loaded.
47
+ hooksConfig .Logger = DefaultLogger
48
+
49
+ // Load the plugin configuration file
50
+ if f , err := cmd .Flags ().GetString ("plugin-config" ); err == nil {
51
+ if err := pluginConfig .Load (file .Provider (f ), yaml .Parser ()); err != nil {
52
+ DefaultLogger .Fatal ().Err (err ).Msg ("Failed to load plugin configuration" )
53
+ os .Exit (gerr .FailedToLoadPluginConfig )
54
+ }
55
+ }
56
+
57
+ // Load plugins and register their hooks
58
+ pluginRegistry .LoadPlugins (pluginConfig )
59
+
35
60
if f , err := cmd .Flags ().GetString ("config" ); err == nil {
36
61
if err := globalConfig .Load (file .Provider (f ), yaml .Parser ()); err != nil {
37
- panic (err )
62
+ DefaultLogger .Fatal ().Err (err ).Msg ("Failed to load configuration" )
63
+ os .Exit (gerr .FailedToLoadGlobalConfig )
38
64
}
39
65
}
40
66
@@ -43,39 +69,47 @@ var runCmd = &cobra.Command{
43
69
44
70
// The config will be passed to the hooks, and in turn to the plugins that
45
71
// register to this hook.
46
- result := hooksConfig .Run (
47
- plugin .OnConfigLoaded ,
48
- plugin.Signature {"config" : globalConfig .All ()},
49
- hooksConfig .Verification )
50
- if result != nil {
51
- var config map [string ]interface {}
52
- if cfg , ok := result ["config" ].(map [string ]interface {}); ok {
53
- config = cfg
54
- }
55
-
56
- if config != nil {
57
- // Load the config from the map emitted by the hook
58
- var hookEmittedConfig * koanf.Koanf
59
- if err := hookEmittedConfig .Load (confmap .Provider (config , "." ), nil ); err != nil {
60
- // Since the logger is not yet initialized, we can't log the error.
61
- // So we panic. Same happens in the next if statement.
62
- panic (err )
63
- }
72
+ currentGlobalConfig , err := structpb .NewStruct (globalConfig .All ())
73
+ if err != nil {
74
+ DefaultLogger .Error ().Err (err ).Msg ("Failed to convert configuration to structpb" )
75
+ } else {
76
+ updatedGlobalConfig , _ := hooksConfig .Run (
77
+ context .Background (),
78
+ currentGlobalConfig ,
79
+ plugin .OnConfigLoaded ,
80
+ hooksConfig .Verification )
64
81
82
+ if updatedGlobalConfig != nil && plugin .Verify (updatedGlobalConfig , currentGlobalConfig ) {
65
83
// Merge the config with the one loaded from the file (in memory).
66
84
// The changes won't be persisted to disk.
67
- if err := globalConfig .Merge (hookEmittedConfig ); err != nil {
68
- panic (err )
85
+ if err := globalConfig .Load (
86
+ confmap .Provider (updatedGlobalConfig .AsMap (), "." ), nil ); err != nil {
87
+ DefaultLogger .Fatal ().Err (err ).Msg ("Failed to merge configuration" )
69
88
}
70
89
}
71
90
}
72
91
73
92
// Create a new logger from the config
74
- logger := logging .NewLogger (loggerConfig ())
75
- hooksConfig .Logger = logger
93
+ loggerCfg := loggerConfig ()
94
+ logger := logging .NewLogger (loggerCfg )
95
+ // TODO: Use https://github.com/dcarbone/zadapters to adapt hclog to zerolog
76
96
// This is a notification hook, so we don't care about the result.
77
- hooksConfig .Run (
78
- plugin .OnNewLogger , plugin.Signature {"logger" : logger }, hooksConfig .Verification )
97
+ data , err := structpb .NewStruct (map [string ]interface {}{
98
+ "timeFormat" : loggerCfg .TimeFormat ,
99
+ "level" : loggerCfg .Level ,
100
+ "output" : loggerCfg .Output ,
101
+ "noColor" : loggerCfg .NoColor ,
102
+ })
103
+ if err != nil {
104
+ logger .Error ().Err (err ).Msg ("Failed to convert logger config to structpb" )
105
+ } else {
106
+ // TODO: Use a context with a timeout
107
+ _ , err := hooksConfig .Run (
108
+ context .Background (), data , plugin .OnNewLogger , hooksConfig .Verification )
109
+ if err != nil {
110
+ logger .Error ().Err (err ).Msg ("Failed to run OnNewLogger hooks" )
111
+ }
112
+ }
79
113
80
114
// Create and initialize a pool of connections
81
115
pool := pool .NewPool ()
@@ -90,15 +124,26 @@ var runCmd = &cobra.Command{
90
124
logger ,
91
125
)
92
126
93
- hooksConfig .Run (
94
- plugin .OnNewClient ,
95
- plugin.Signature {
96
- "client" : client ,
97
- },
98
- hooksConfig .Verification ,
99
- )
100
-
101
127
if client != nil {
128
+ clientCfg , err := structpb .NewStruct (map [string ]interface {}{
129
+ "id" : client .ID ,
130
+ "network" : clientConfig .Network ,
131
+ "address" : clientConfig .Address ,
132
+ "receiveBufferSize" : clientConfig .ReceiveBufferSize ,
133
+ })
134
+ if err != nil {
135
+ logger .Error ().Err (err ).Msg ("Failed to convert client config to structpb" )
136
+ } else {
137
+ _ , err := hooksConfig .Run (
138
+ context .Background (),
139
+ clientCfg ,
140
+ plugin .OnNewClient ,
141
+ hooksConfig .Verification )
142
+ if err != nil {
143
+ logger .Error ().Err (err ).Msg ("Failed to run OnNewClient hooks" )
144
+ }
145
+ }
146
+
102
147
pool .Put (client .ID , client )
103
148
}
104
149
}
@@ -113,15 +158,38 @@ var runCmd = &cobra.Command{
113
158
os .Exit (1 )
114
159
}
115
160
116
- hooksConfig .Run (
117
- plugin .OnNewPool , plugin.Signature {"pool" : pool }, hooksConfig .Verification )
161
+ poolCfg , err := structpb .NewStruct (map [string ]interface {}{
162
+ "size" : poolSize ,
163
+ })
164
+ if err != nil {
165
+ logger .Error ().Err (err ).Msg ("Failed to convert pool config to structpb" )
166
+ } else {
167
+ _ , err := hooksConfig .Run (
168
+ context .Background (), poolCfg , plugin .OnNewPool , hooksConfig .Verification )
169
+ if err != nil {
170
+ logger .Error ().Err (err ).Msg ("Failed to run OnNewPool hooks" )
171
+ }
172
+ }
118
173
119
174
// Create a prefork proxy with the pool of clients
120
175
elastic , reuseElasticClients , elasticClientConfig := proxyConfig ()
121
176
proxy := network .NewProxy (
122
177
pool , hooksConfig , elastic , reuseElasticClients , elasticClientConfig , logger )
123
- hooksConfig .Run (
124
- plugin .OnNewProxy , plugin.Signature {"proxy" : proxy }, hooksConfig .Verification )
178
+
179
+ proxyCfg , err := structpb .NewStruct (map [string ]interface {}{
180
+ "elastic" : elastic ,
181
+ "reuseElasticClients" : reuseElasticClients ,
182
+ "clientConfig" : elasticClientConfig ,
183
+ })
184
+ if err != nil {
185
+ logger .Error ().Err (err ).Msg ("Failed to convert proxy config to structpb" )
186
+ } else {
187
+ _ , err := hooksConfig .Run (
188
+ context .Background (), proxyCfg , plugin .OnNewProxy , hooksConfig .Verification )
189
+ if err != nil {
190
+ logger .Error ().Err (err ).Msg ("Failed to run OnNewProxy hooks" )
191
+ }
192
+ }
125
193
126
194
// Create a server
127
195
serverConfig := serverConfig ()
@@ -166,9 +234,35 @@ var runCmd = &cobra.Command{
166
234
logger ,
167
235
hooksConfig ,
168
236
)
169
- hooksConfig .Run (
170
- plugin .OnNewServer , plugin.Signature {"server" : server }, hooksConfig .Verification )
171
237
238
+ serverCfg , err := structpb .NewStruct (map [string ]interface {}{
239
+ "network" : serverConfig .Network ,
240
+ "address" : serverConfig .Address ,
241
+ "softLimit" : serverConfig .SoftLimit ,
242
+ "hardLimit" : serverConfig .HardLimit ,
243
+ "tickInterval" : serverConfig .TickInterval ,
244
+ "multiCore" : serverConfig .MultiCore ,
245
+ "lockOSThread" : serverConfig .LockOSThread ,
246
+ "enableTicker" : serverConfig .EnableTicker ,
247
+ "loadBalancer" : serverConfig .LoadBalancer ,
248
+ "readBufferCap" : serverConfig .ReadBufferCap ,
249
+ "writeBufferCap" : serverConfig .WriteBufferCap ,
250
+ "socketRecvBuffer" : serverConfig .SocketRecvBuffer ,
251
+ "socketSendBuffer" : serverConfig .SocketSendBuffer ,
252
+ "reuseAddress" : serverConfig .ReuseAddress ,
253
+ "reusePort" : serverConfig .ReusePort ,
254
+ "tcpKeepAlive" : serverConfig .TCPKeepAlive ,
255
+ "tcpNoDelay" : serverConfig .TCPNoDelay ,
256
+ })
257
+ if err != nil {
258
+ logger .Error ().Err (err ).Msg ("Failed to convert server config to structpb" )
259
+ } else {
260
+ _ , err := hooksConfig .Run (
261
+ context .Background (), serverCfg , plugin .OnNewServer , hooksConfig .Verification )
262
+ if err != nil {
263
+ logger .Error ().Err (err ).Msg ("Failed to run OnNewServer hooks" )
264
+ }
265
+ }
172
266
// Shutdown the server gracefully
173
267
var signals []os.Signal
174
268
signals = append (signals ,
@@ -187,10 +281,25 @@ var runCmd = &cobra.Command{
187
281
for _ , s := range signals {
188
282
if sig != s {
189
283
// Notify the hooks that the server is shutting down
190
- hooksConfig .Run (
191
- plugin .OnSignal , plugin.Signature {"signal" : sig }, hooksConfig .Verification )
284
+ signalCfg , err := structpb .NewStruct (map [string ]interface {}{"signal" : sig })
285
+ if err != nil {
286
+ logger .Error ().Err (err ).Msg (
287
+ "Failed to convert signal config to structpb" )
288
+ } else {
289
+ _ , err := hooksConfig .Run (
290
+ context .Background (),
291
+ signalCfg ,
292
+ plugin .OnSignal ,
293
+ hooksConfig .Verification ,
294
+ )
295
+ if err != nil {
296
+ logger .Error ().Err (err ).Msg ("Failed to run OnSignal hooks" )
297
+ }
298
+ }
192
299
193
300
server .Shutdown ()
301
+ pluginRegistry .Shutdown ()
302
+ goplugin .CleanupClients ()
194
303
os .Exit (0 )
195
304
}
196
305
}
@@ -208,5 +317,11 @@ func init() {
208
317
rootCmd .AddCommand (runCmd )
209
318
210
319
runCmd .PersistentFlags ().StringVarP (
211
- & configFile , "config" , "c" , "./gatewayd.yaml" , "config file (default is ./gatewayd.yaml)" )
320
+ & globalConfigFile ,
321
+ "config" , "c" , "./gatewayd.yaml" ,
322
+ "config file (default is ./gatewayd.yaml)" )
323
+ runCmd .PersistentFlags ().StringVarP (
324
+ & pluginConfigFile ,
325
+ "plugin-config" , "p" , "./gatewayd_plugins.yaml" ,
326
+ "plugin config file (default is ./gatewayd_plugins.yaml)" )
212
327
}
0 commit comments