Skip to content

Commit 68f7260

Browse files
committed
[feature] runtime module loading
Signed-off-by: Stepan Paksashvili <[email protected]>
1 parent 0fec244 commit 68f7260

File tree

3 files changed

+59
-214
lines changed

3 files changed

+59
-214
lines changed

pkg/module_manager/loader/fs/fs.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"path/filepath"
1010
"regexp"
1111
"strconv"
12-
"strings"
1312

1413
"github.com/deckhouse/deckhouse/pkg/log"
1514

@@ -77,14 +76,9 @@ func (fl *FileSystemLoader) getBasicModule(definition moduleDefinition, commonSt
7776
}
7877

7978
// LoadModule reads single directory and returns BasicModule
80-
func (fl *FileSystemLoader) LoadModule(_, modulePath string) (*modules.BasicModule, error) {
81-
// the module's parent directory
82-
var modulesDir string
83-
if strings.HasSuffix(modulePath, "/") {
84-
modulesDir = filepath.Dir(strings.TrimRight(modulePath, "/"))
85-
} else {
86-
modulesDir = filepath.Dir(modulePath)
87-
}
79+
func (fl *FileSystemLoader) LoadModule(moduleName string) (*modules.BasicModule, error) {
80+
modulesDir := fl.dirs[0]
81+
modulePath := filepath.Join(modulesDir, moduleName)
8882

8983
commonStaticValues, err := utils.LoadValuesFileFromDir(modulesDir, app.StrictModeEnabled)
9084
if err != nil {

pkg/module_manager/loader/loader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ import (
66

77
type ModuleLoader interface {
88
LoadModules() ([]*modules.BasicModule, error)
9-
LoadModule(moduleSource string, modulePath string) (*modules.BasicModule, error)
9+
LoadModule(moduleName string) (*modules.BasicModule, error)
1010
}

pkg/module_manager/module_manager.go

Lines changed: 55 additions & 204 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/deckhouse/deckhouse/pkg/log"
1515
"github.com/hashicorp/go-multierror"
1616
"go.opentelemetry.io/otel"
17+
"go.opentelemetry.io/otel/attribute"
1718

1819
"github.com/flant/addon-operator/pkg/app"
1920
"github.com/flant/addon-operator/pkg/helm"
@@ -1169,13 +1170,13 @@ func (mm *ModuleManager) AreModulesInited() bool {
11691170
}
11701171

11711172
// RunModuleWithNewOpenAPISchema updates the module's OpenAPI schema from modulePath directory and pushes RunModuleTask if the module is enabled
1172-
func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName, moduleSource, modulePath string) error {
1173+
func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName string) error {
11731174
currentModule := mm.modules.Get(moduleName)
11741175
if currentModule == nil {
11751176
return fmt.Errorf("failed to get basic module - not found")
11761177
}
11771178

1178-
basicModule, err := mm.moduleLoader.LoadModule(moduleSource, modulePath)
1179+
basicModule, err := mm.moduleLoader.LoadModule(moduleName)
11791180
if err != nil {
11801181
return fmt.Errorf("load module: %w", err)
11811182
}
@@ -1193,216 +1194,66 @@ func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName, moduleSource,
11931194
}
11941195

11951196
// RegisterModule checks if a module already exists and reapplies(reloads) its configuration.
1196-
// If it's a new module - converges all modules - EXPERIMENTAL
1197-
func (mm *ModuleManager) RegisterModule(_, _ string) error {
1198-
return fmt.Errorf("not implemented yet")
1199-
}
1200-
1201-
/*
1202-
if !mm.modules.IsInited() {
1203-
return moduleset.ErrNotInited
1204-
}
1205-
1206-
if mm.ModulesDir == "" {
1207-
log.Warnf("Empty modules directory is passed! No modules to load.")
1208-
return nil
1209-
}
1210-
1211-
if mm.moduleLoader == nil {
1212-
log.Errorf("no module loader set")
1213-
return fmt.Errorf("no module loader set")
1214-
}
1215-
1216-
// get basic module definition
1217-
basicModule, err := mm.moduleLoader.LoadModule(moduleSource, modulePath)
1218-
1219-
if err != nil {
1220-
return fmt.Errorf("failed to get basic module's definition: %w", err)
1221-
}
1222-
1223-
moduleName := basicModule.GetName()
1224-
1225-
// load and registry global hooks
1226-
1227-
dep := &hooks.HookExecutionDependencyContainer{
1228-
HookMetricsStorage: mm.dependencies.HookMetricStorage,
1229-
KubeConfigManager: mm.dependencies.KubeConfigManager,
1230-
KubeObjectPatcher: mm.dependencies.KubeObjectPatcher,
1231-
MetricStorage: mm.dependencies.MetricStorage,
1232-
GlobalValuesGetter: mm.global,
1233-
}
1234-
1235-
basicModule.WithDependencies(dep)
1236-
1237-
// check if module already exists
1238-
1239-
if mm.modules.Has(moduleName) {
1240-
// if module is disabled in module manager
1241-
if !mm.IsModuleEnabled(moduleName) {
1242-
// update(upsert) module config in moduleset
1243-
mm.modules.Add(basicModule)
1244-
// get kube config for the module to check if it has enabled: true
1245-
moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
1246-
// if module isn't explicitly enabled in the module kube config - exit
1247-
if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
1248-
return nil
1249-
}
1250-
mm.AddEnabledModuleByConfigName(moduleName)
1251-
1252-
// if the module kube config has enabled true, check enable script
1253-
isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1254-
if err != nil {
1255-
return err
1256-
}
1257-
1258-
if isEnabled {
1259-
mm.SendModuleEvent(events.ModuleEvent{
1260-
ModuleName: moduleName,
1261-
EventType: events.ModuleEnabled,
1262-
})
1263-
err := mm.UpdateModuleKubeConfig(moduleName)
1264-
if err != nil {
1265-
return err
1266-
}
1267-
log.Infof("Push ConvergeModules task because %q Module was re-enabled", moduleName)
1268-
mm.PushConvergeModulesTask(moduleName, "re-enabled")
1269-
}
1270-
return nil
1271-
}
1272-
// module is enabled, disable its hooks
1273-
mm.DisableModuleHooks(moduleName)
1274-
1275-
module := mm.GetModule(moduleName)
1276-
// check for nil to prevent operator from panicking
1277-
if module == nil {
1278-
return fmt.Errorf("couldn't get %s module configuration", moduleName)
1279-
}
1280-
1281-
// deregister modules' hooks
1282-
module.DeregisterHooks()
1283-
1284-
// upsert a new module in the moduleset
1285-
mm.modules.Add(basicModule)
1286-
1287-
// check if module is enabled via enabled scripts
1288-
isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1289-
if err != nil {
1290-
return err
1291-
}
1292-
1293-
ev := events.ModuleEvent{
1294-
ModuleName: moduleName,
1295-
EventType: events.ModuleEnabled,
1296-
}
1297-
1298-
if isEnabled {
1299-
// enqueue module startup sequence if it is enabled
1300-
err := mm.PushRunModuleTask(moduleName, false)
1301-
if err != nil {
1302-
return err
1303-
}
1304-
} else {
1305-
ev.EventType = events.ModuleDisabled
1306-
mm.PushDeleteModuleTask(moduleName)
1307-
// modules is disabled - update modulemanager's state
1308-
mm.DeleteEnabledModuleName(moduleName)
1309-
}
1310-
mm.SendModuleEvent(ev)
1311-
return nil
1312-
}
1313-
1314-
// module doesn't exist
1315-
mm.modules.Add(basicModule)
1316-
1317-
// a new module requires to be registered
1318-
1319-
mm.SendModuleEvent(events.ModuleEvent{
1320-
ModuleName: moduleName,
1321-
EventType: events.ModuleRegistered,
1322-
})
1323-
1324-
// get kube config for the module to check if it has enabled: true
1325-
moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
1326-
// if module isn't explicitly enabled in the module kube config - exit
1327-
1328-
if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
1329-
return nil
1330-
}
1331-
1332-
mm.AddEnabledModuleByConfigName(moduleName)
1333-
1334-
// if the module kube config has enabled true, check enable script
1335-
isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1336-
1337-
if err != nil {
1338-
return err
1339-
}
1340-
1341-
if isEnabled {
1342-
err := mm.UpdateModuleKubeConfig(moduleName)
1343-
if err != nil {
1344-
return err
1345-
}
1346-
log.Infof("Push ConvergeModules task because %q Module was enabled", moduleName)
1347-
mm.PushConvergeModulesTask(moduleName, "registered-and-enabled")
1348-
mm.SendModuleEvent(events.ModuleEvent{
1349-
ModuleName: moduleName,
1350-
EventType: events.ModuleEnabled,
1351-
})
1352-
}
1353-
1354-
return nil
1355-
}
1356-
1357-
// PushDeleteModule pushes moduleDelete task for a module into the main queue
1358-
// TODO: EXPERIMENTAL
1359-
/*func (mm *ModuleManager) PushDeleteModuleTask(moduleName string) {
1360-
// check if there is already moduleDelete task in the main queue for the module
1361-
if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) {
1362-
return
1197+
// If it's a new module - converges all modules
1198+
func (mm *ModuleManager) RegisterModule(ctx context.Context, moduleName string) error {
1199+
ctx, span := otel.Tracer(moduleManagerServiceName).Start(ctx, "RegisterModule")
1200+
defer span.End()
1201+
1202+
span.SetAttributes(attribute.String("module", moduleName))
1203+
1204+
mm.logger.Debug("register the module", slog.String("module", moduleName))
1205+
1206+
if module := mm.GetModule(moduleName); module != nil {
1207+
mm.logger.Debug("unregister the module", slog.String("module", moduleName))
1208+
1209+
mm.DisableModuleHooks(moduleName)
1210+
mm.dependencies.HelmResourcesManager.StopMonitor(moduleName)
1211+
module.ResetState()
1212+
module.DeregisterHooks()
13631213
}
13641214

1365-
newTask := sh_task.NewTask(task.ModuleDelete).
1366-
WithQueueName("main").
1367-
WithMetadata(task.HookMetadata{
1368-
EventDescription: "ModuleManager-Delete-Module",
1369-
ModuleName: moduleName,
1370-
})
1371-
newTask.SetProp("triggered-by", "ModuleManager")
1215+
module, err := mm.moduleLoader.LoadModule(moduleName)
1216+
if err != nil {
1217+
return fmt.Errorf("load the module '%s': %w", moduleName, err)
1218+
}
13721219

1373-
mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
1220+
// load and registry global hooks
1221+
dep := &hooks.HookExecutionDependencyContainer{
1222+
HookMetricsStorage: mm.dependencies.HookMetricStorage,
1223+
KubeConfigManager: mm.dependencies.KubeConfigManager,
1224+
KubeObjectPatcher: mm.dependencies.KubeObjectPatcher,
1225+
MetricStorage: mm.dependencies.MetricStorage,
1226+
GlobalValuesGetter: mm.global,
1227+
}
13741228

1375-
log.Infof("Push ConvergeModules task because %q Module was disabled", moduleName)
1376-
mm.PushConvergeModulesTask(moduleName, "disabled")
1377-
}
1229+
module.WithDependencies(dep)
13781230

1379-
// PushConvergeModulesTask pushes ConvergeModulesTask into the main queue to update all modules on a module enable/disable event
1380-
// TODO: EXPERIMENTAL
1381-
func (mm *ModuleManager) PushConvergeModulesTask(moduleName, moduleState string) {
1382-
newConvergeTask := sh_task.NewTask(task.ConvergeModules).
1383-
WithQueueName("main").
1384-
WithMetadata(task.HookMetadata{
1385-
EventDescription: fmt.Sprintf("ModuleManager-%s-Module", moduleState),
1386-
ModuleName: moduleName,
1387-
}).
1388-
WithQueuedAt(time.Now())
1389-
newConvergeTask.SetProp("triggered-by", "ModuleManager")
1390-
newConvergeTask.SetProp(converge.ConvergeEventProp, converge.ReloadAllModules)
1231+
// add module to set
1232+
mm.modules.Add(module)
13911233

1392-
mm.dependencies.TaskQueues.GetMain().AddLast(newConvergeTask.WithQueuedAt(time.Now()))
1393-
}
1234+
// add module to scheduler
1235+
if err = mm.moduleScheduler.AddModuleVertex(module); err != nil {
1236+
return fmt.Errorf("add module vertex: %w", err)
1237+
}
13941238

1395-
// queueHasPendingModuleDeleteTask returns true if queue has pending tasks
1396-
// with the type "ModuleDelete" related to the module "moduleName"
1397-
// TODO: EXPERIMENTAL
1398-
func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool {
1399-
if q == nil {
1400-
return false
1239+
// reinit scheduler
1240+
if err = mm.moduleScheduler.Initialize(); err != nil {
1241+
return fmt.Errorf("initialize module scheduler: %w", err)
14011242
}
1402-
modules := modulesWithPendingTasks(q, task.ModuleDelete)
1403-
meta, has := modules[moduleName]
1404-
return has && meta.doStartup
1405-
} */
1243+
1244+
mm.SendModuleEvent(events.ModuleEvent{
1245+
ModuleName: module.Name,
1246+
EventType: events.ModuleRegistered,
1247+
})
1248+
1249+
// send event to trigger reconverge
1250+
mm.SchedulerEventCh() <- extenders.ExtenderEvent{
1251+
ExtenderName: dynamic_extender.Name,
1252+
EncapsulatedEvent: dynamic_extender.DynamicExtenderEvent{},
1253+
}
1254+
1255+
return nil
1256+
}
14061257

14071258
// registerModules load all available modules from modules directory.
14081259
func (mm *ModuleManager) registerModules(scriptEnabledExtender *script_extender.Extender) error {

0 commit comments

Comments
 (0)