@@ -14,6 +14,7 @@ import (
14
14
"github.com/deckhouse/deckhouse/pkg/log"
15
15
"github.com/hashicorp/go-multierror"
16
16
"go.opentelemetry.io/otel"
17
+ "go.opentelemetry.io/otel/attribute"
17
18
18
19
"github.com/flant/addon-operator/pkg/app"
19
20
"github.com/flant/addon-operator/pkg/helm"
@@ -1169,13 +1170,13 @@ func (mm *ModuleManager) AreModulesInited() bool {
1169
1170
}
1170
1171
1171
1172
// RunModuleWithNewOpenAPISchema updates the module's OpenAPI schema from modulePath directory and pushes RunModuleTask if the module is enabled
1172
- func (mm * ModuleManager ) RunModuleWithNewOpenAPISchema (moduleName , moduleSource , modulePath string ) error {
1173
+ func (mm * ModuleManager ) RunModuleWithNewOpenAPISchema (moduleName string ) error {
1173
1174
currentModule := mm .modules .Get (moduleName )
1174
1175
if currentModule == nil {
1175
1176
return fmt .Errorf ("failed to get basic module - not found" )
1176
1177
}
1177
1178
1178
- basicModule , err := mm .moduleLoader .LoadModule (moduleSource , modulePath )
1179
+ basicModule , err := mm .moduleLoader .LoadModule (moduleName )
1179
1180
if err != nil {
1180
1181
return fmt .Errorf ("load module: %w" , err )
1181
1182
}
@@ -1193,216 +1194,66 @@ func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName, moduleSource,
1193
1194
}
1194
1195
1195
1196
// RegisterModule checks if a module already exists and reapplies(reloads) its configuration.
1196
- // If it's a new module - converges all modules - EXPERIMENTAL
1197
- func (mm * ModuleManager ) RegisterModule (_ , _ string ) error {
1198
- return fmt .Errorf ("not implemented yet" )
1199
- }
1200
-
1201
- /*
1202
- if !mm.modules.IsInited() {
1203
- return moduleset.ErrNotInited
1204
- }
1205
-
1206
- if mm.ModulesDir == "" {
1207
- log.Warnf("Empty modules directory is passed! No modules to load.")
1208
- return nil
1209
- }
1210
-
1211
- if mm.moduleLoader == nil {
1212
- log.Errorf("no module loader set")
1213
- return fmt.Errorf("no module loader set")
1214
- }
1215
-
1216
- // get basic module definition
1217
- basicModule, err := mm.moduleLoader.LoadModule(moduleSource, modulePath)
1218
-
1219
- if err != nil {
1220
- return fmt.Errorf("failed to get basic module's definition: %w", err)
1221
- }
1222
-
1223
- moduleName := basicModule.GetName()
1224
-
1225
- // load and registry global hooks
1226
-
1227
- dep := &hooks.HookExecutionDependencyContainer{
1228
- HookMetricsStorage: mm.dependencies.HookMetricStorage,
1229
- KubeConfigManager: mm.dependencies.KubeConfigManager,
1230
- KubeObjectPatcher: mm.dependencies.KubeObjectPatcher,
1231
- MetricStorage: mm.dependencies.MetricStorage,
1232
- GlobalValuesGetter: mm.global,
1233
- }
1234
-
1235
- basicModule.WithDependencies(dep)
1236
-
1237
- // check if module already exists
1238
-
1239
- if mm.modules.Has(moduleName) {
1240
- // if module is disabled in module manager
1241
- if !mm.IsModuleEnabled(moduleName) {
1242
- // update(upsert) module config in moduleset
1243
- mm.modules.Add(basicModule)
1244
- // get kube config for the module to check if it has enabled: true
1245
- moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
1246
- // if module isn't explicitly enabled in the module kube config - exit
1247
- if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
1248
- return nil
1249
- }
1250
- mm.AddEnabledModuleByConfigName(moduleName)
1251
-
1252
- // if the module kube config has enabled true, check enable script
1253
- isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1254
- if err != nil {
1255
- return err
1256
- }
1257
-
1258
- if isEnabled {
1259
- mm.SendModuleEvent(events.ModuleEvent{
1260
- ModuleName: moduleName,
1261
- EventType: events.ModuleEnabled,
1262
- })
1263
- err := mm.UpdateModuleKubeConfig(moduleName)
1264
- if err != nil {
1265
- return err
1266
- }
1267
- log.Infof("Push ConvergeModules task because %q Module was re-enabled", moduleName)
1268
- mm.PushConvergeModulesTask(moduleName, "re-enabled")
1269
- }
1270
- return nil
1271
- }
1272
- // module is enabled, disable its hooks
1273
- mm.DisableModuleHooks(moduleName)
1274
-
1275
- module := mm.GetModule(moduleName)
1276
- // check for nil to prevent operator from panicking
1277
- if module == nil {
1278
- return fmt.Errorf("couldn't get %s module configuration", moduleName)
1279
- }
1280
-
1281
- // deregister modules' hooks
1282
- module.DeregisterHooks()
1283
-
1284
- // upsert a new module in the moduleset
1285
- mm.modules.Add(basicModule)
1286
-
1287
- // check if module is enabled via enabled scripts
1288
- isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1289
- if err != nil {
1290
- return err
1291
- }
1292
-
1293
- ev := events.ModuleEvent{
1294
- ModuleName: moduleName,
1295
- EventType: events.ModuleEnabled,
1296
- }
1297
-
1298
- if isEnabled {
1299
- // enqueue module startup sequence if it is enabled
1300
- err := mm.PushRunModuleTask(moduleName, false)
1301
- if err != nil {
1302
- return err
1303
- }
1304
- } else {
1305
- ev.EventType = events.ModuleDisabled
1306
- mm.PushDeleteModuleTask(moduleName)
1307
- // modules is disabled - update modulemanager's state
1308
- mm.DeleteEnabledModuleName(moduleName)
1309
- }
1310
- mm.SendModuleEvent(ev)
1311
- return nil
1312
- }
1313
-
1314
- // module doesn't exist
1315
- mm.modules.Add(basicModule)
1316
-
1317
- // a new module requires to be registered
1318
-
1319
- mm.SendModuleEvent(events.ModuleEvent{
1320
- ModuleName: moduleName,
1321
- EventType: events.ModuleRegistered,
1322
- })
1323
-
1324
- // get kube config for the module to check if it has enabled: true
1325
- moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
1326
- // if module isn't explicitly enabled in the module kube config - exit
1327
-
1328
- if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
1329
- return nil
1330
- }
1331
-
1332
- mm.AddEnabledModuleByConfigName(moduleName)
1333
-
1334
- // if the module kube config has enabled true, check enable script
1335
- isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1336
-
1337
- if err != nil {
1338
- return err
1339
- }
1340
-
1341
- if isEnabled {
1342
- err := mm.UpdateModuleKubeConfig(moduleName)
1343
- if err != nil {
1344
- return err
1345
- }
1346
- log.Infof("Push ConvergeModules task because %q Module was enabled", moduleName)
1347
- mm.PushConvergeModulesTask(moduleName, "registered-and-enabled")
1348
- mm.SendModuleEvent(events.ModuleEvent{
1349
- ModuleName: moduleName,
1350
- EventType: events.ModuleEnabled,
1351
- })
1352
- }
1353
-
1354
- return nil
1355
- }
1356
-
1357
- // PushDeleteModule pushes moduleDelete task for a module into the main queue
1358
- // TODO: EXPERIMENTAL
1359
- /*func (mm *ModuleManager) PushDeleteModuleTask(moduleName string) {
1360
- // check if there is already moduleDelete task in the main queue for the module
1361
- if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) {
1362
- return
1197
+ // If it's a new module - converges all modules
1198
+ func (mm * ModuleManager ) RegisterModule (ctx context.Context , moduleName string ) error {
1199
+ ctx , span := otel .Tracer (moduleManagerServiceName ).Start (ctx , "RegisterModule" )
1200
+ defer span .End ()
1201
+
1202
+ span .SetAttributes (attribute .String ("module" , moduleName ))
1203
+
1204
+ mm .logger .Debug ("register the module" , slog .String ("module" , moduleName ))
1205
+
1206
+ if module := mm .GetModule (moduleName ); module != nil {
1207
+ mm .logger .Debug ("unregister the module" , slog .String ("module" , moduleName ))
1208
+
1209
+ mm .DisableModuleHooks (moduleName )
1210
+ mm .dependencies .HelmResourcesManager .StopMonitor (moduleName )
1211
+ module .ResetState ()
1212
+ module .DeregisterHooks ()
1363
1213
}
1364
1214
1365
- newTask := sh_task.NewTask(task.ModuleDelete).
1366
- WithQueueName("main").
1367
- WithMetadata(task.HookMetadata{
1368
- EventDescription: "ModuleManager-Delete-Module",
1369
- ModuleName: moduleName,
1370
- })
1371
- newTask.SetProp("triggered-by", "ModuleManager")
1215
+ module , err := mm .moduleLoader .LoadModule (moduleName )
1216
+ if err != nil {
1217
+ return fmt .Errorf ("load the module '%s': %w" , moduleName , err )
1218
+ }
1372
1219
1373
- mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
1220
+ // load and registry global hooks
1221
+ dep := & hooks.HookExecutionDependencyContainer {
1222
+ HookMetricsStorage : mm .dependencies .HookMetricStorage ,
1223
+ KubeConfigManager : mm .dependencies .KubeConfigManager ,
1224
+ KubeObjectPatcher : mm .dependencies .KubeObjectPatcher ,
1225
+ MetricStorage : mm .dependencies .MetricStorage ,
1226
+ GlobalValuesGetter : mm .global ,
1227
+ }
1374
1228
1375
- log.Infof("Push ConvergeModules task because %q Module was disabled", moduleName)
1376
- mm.PushConvergeModulesTask(moduleName, "disabled")
1377
- }
1229
+ module .WithDependencies (dep )
1378
1230
1379
- // PushConvergeModulesTask pushes ConvergeModulesTask into the main queue to update all modules on a module enable/disable event
1380
- // TODO: EXPERIMENTAL
1381
- func (mm *ModuleManager) PushConvergeModulesTask(moduleName, moduleState string) {
1382
- newConvergeTask := sh_task.NewTask(task.ConvergeModules).
1383
- WithQueueName("main").
1384
- WithMetadata(task.HookMetadata{
1385
- EventDescription: fmt.Sprintf("ModuleManager-%s-Module", moduleState),
1386
- ModuleName: moduleName,
1387
- }).
1388
- WithQueuedAt(time.Now())
1389
- newConvergeTask.SetProp("triggered-by", "ModuleManager")
1390
- newConvergeTask.SetProp(converge.ConvergeEventProp, converge.ReloadAllModules)
1231
+ // add module to set
1232
+ mm .modules .Add (module )
1391
1233
1392
- mm.dependencies.TaskQueues.GetMain().AddLast(newConvergeTask.WithQueuedAt(time.Now()))
1393
- }
1234
+ // add module to scheduler
1235
+ if err = mm .moduleScheduler .AddModuleVertex (module ); err != nil {
1236
+ return fmt .Errorf ("add module vertex: %w" , err )
1237
+ }
1394
1238
1395
- // queueHasPendingModuleDeleteTask returns true if queue has pending tasks
1396
- // with the type "ModuleDelete" related to the module "moduleName"
1397
- // TODO: EXPERIMENTAL
1398
- func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool {
1399
- if q == nil {
1400
- return false
1239
+ // reinit scheduler
1240
+ if err = mm .moduleScheduler .Initialize (); err != nil {
1241
+ return fmt .Errorf ("initialize module scheduler: %w" , err )
1401
1242
}
1402
- modules := modulesWithPendingTasks(q, task.ModuleDelete)
1403
- meta, has := modules[moduleName]
1404
- return has && meta.doStartup
1405
- } */
1243
+
1244
+ mm .SendModuleEvent (events.ModuleEvent {
1245
+ ModuleName : module .Name ,
1246
+ EventType : events .ModuleRegistered ,
1247
+ })
1248
+
1249
+ // send event to trigger reconverge
1250
+ mm .SchedulerEventCh () <- extenders.ExtenderEvent {
1251
+ ExtenderName : dynamic_extender .Name ,
1252
+ EncapsulatedEvent : dynamic_extender.DynamicExtenderEvent {},
1253
+ }
1254
+
1255
+ return nil
1256
+ }
1406
1257
1407
1258
// registerModules load all available modules from modules directory.
1408
1259
func (mm * ModuleManager ) registerModules (scriptEnabledExtender * script_extender.Extender ) error {
0 commit comments