Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions pkg/module_manager/loader/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"

"github.com/deckhouse/deckhouse/pkg/log"

Expand Down Expand Up @@ -77,14 +76,9 @@ func (fl *FileSystemLoader) getBasicModule(definition moduleDefinition, commonSt
}

// LoadModule reads single directory and returns BasicModule
func (fl *FileSystemLoader) LoadModule(_, modulePath string) (*modules.BasicModule, error) {
// the module's parent directory
var modulesDir string
if strings.HasSuffix(modulePath, "/") {
modulesDir = filepath.Dir(strings.TrimRight(modulePath, "/"))
} else {
modulesDir = filepath.Dir(modulePath)
}
func (fl *FileSystemLoader) LoadModule(moduleName string) (*modules.BasicModule, error) {
modulesDir := fl.dirs[0]
modulePath := filepath.Join(modulesDir, moduleName)

commonStaticValues, err := utils.LoadValuesFileFromDir(modulesDir, app.StrictModeEnabled)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/module_manager/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import (

type ModuleLoader interface {
LoadModules() ([]*modules.BasicModule, error)
LoadModule(moduleSource string, modulePath string) (*modules.BasicModule, error)
LoadModule(moduleName string) (*modules.BasicModule, error)
}
259 changes: 55 additions & 204 deletions pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/deckhouse/deckhouse/pkg/log"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

"github.com/flant/addon-operator/pkg/app"
"github.com/flant/addon-operator/pkg/helm"
Expand Down Expand Up @@ -1169,13 +1170,13 @@
}

// RunModuleWithNewOpenAPISchema updates the module's OpenAPI schema from modulePath directory and pushes RunModuleTask if the module is enabled
func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName, moduleSource, modulePath string) error {
func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName string) error {
currentModule := mm.modules.Get(moduleName)
if currentModule == nil {
return fmt.Errorf("failed to get basic module - not found")
}

basicModule, err := mm.moduleLoader.LoadModule(moduleSource, modulePath)
basicModule, err := mm.moduleLoader.LoadModule(moduleName)
if err != nil {
return fmt.Errorf("load module: %w", err)
}
Expand All @@ -1193,216 +1194,66 @@
}

// RegisterModule checks if a module already exists and reapplies(reloads) its configuration.
// If it's a new module - converges all modules - EXPERIMENTAL
func (mm *ModuleManager) RegisterModule(_, _ string) error {
return fmt.Errorf("not implemented yet")
}

/*
if !mm.modules.IsInited() {
return moduleset.ErrNotInited
}

if mm.ModulesDir == "" {
log.Warnf("Empty modules directory is passed! No modules to load.")
return nil
}

if mm.moduleLoader == nil {
log.Errorf("no module loader set")
return fmt.Errorf("no module loader set")
}

// get basic module definition
basicModule, err := mm.moduleLoader.LoadModule(moduleSource, modulePath)

if err != nil {
return fmt.Errorf("failed to get basic module's definition: %w", err)
}

moduleName := basicModule.GetName()

// load and registry global hooks

dep := &hooks.HookExecutionDependencyContainer{
HookMetricsStorage: mm.dependencies.HookMetricStorage,
KubeConfigManager: mm.dependencies.KubeConfigManager,
KubeObjectPatcher: mm.dependencies.KubeObjectPatcher,
MetricStorage: mm.dependencies.MetricStorage,
GlobalValuesGetter: mm.global,
}

basicModule.WithDependencies(dep)

// check if module already exists

if mm.modules.Has(moduleName) {
// if module is disabled in module manager
if !mm.IsModuleEnabled(moduleName) {
// update(upsert) module config in moduleset
mm.modules.Add(basicModule)
// get kube config for the module to check if it has enabled: true
moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
// if module isn't explicitly enabled in the module kube config - exit
if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
return nil
}
mm.AddEnabledModuleByConfigName(moduleName)

// if the module kube config has enabled true, check enable script
isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
if err != nil {
return err
}

if isEnabled {
mm.SendModuleEvent(events.ModuleEvent{
ModuleName: moduleName,
EventType: events.ModuleEnabled,
})
err := mm.UpdateModuleKubeConfig(moduleName)
if err != nil {
return err
}
log.Infof("Push ConvergeModules task because %q Module was re-enabled", moduleName)
mm.PushConvergeModulesTask(moduleName, "re-enabled")
}
return nil
}
// module is enabled, disable its hooks
mm.DisableModuleHooks(moduleName)

module := mm.GetModule(moduleName)
// check for nil to prevent operator from panicking
if module == nil {
return fmt.Errorf("couldn't get %s module configuration", moduleName)
}

// deregister modules' hooks
module.DeregisterHooks()

// upsert a new module in the moduleset
mm.modules.Add(basicModule)

// check if module is enabled via enabled scripts
isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
if err != nil {
return err
}

ev := events.ModuleEvent{
ModuleName: moduleName,
EventType: events.ModuleEnabled,
}

if isEnabled {
// enqueue module startup sequence if it is enabled
err := mm.PushRunModuleTask(moduleName, false)
if err != nil {
return err
}
} else {
ev.EventType = events.ModuleDisabled
mm.PushDeleteModuleTask(moduleName)
// modules is disabled - update modulemanager's state
mm.DeleteEnabledModuleName(moduleName)
}
mm.SendModuleEvent(ev)
return nil
}

// module doesn't exist
mm.modules.Add(basicModule)

// a new module requires to be registered

mm.SendModuleEvent(events.ModuleEvent{
ModuleName: moduleName,
EventType: events.ModuleRegistered,
})

// get kube config for the module to check if it has enabled: true
moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
// if module isn't explicitly enabled in the module kube config - exit

if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
return nil
}

mm.AddEnabledModuleByConfigName(moduleName)

// if the module kube config has enabled true, check enable script
isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})

if err != nil {
return err
}

if isEnabled {
err := mm.UpdateModuleKubeConfig(moduleName)
if err != nil {
return err
}
log.Infof("Push ConvergeModules task because %q Module was enabled", moduleName)
mm.PushConvergeModulesTask(moduleName, "registered-and-enabled")
mm.SendModuleEvent(events.ModuleEvent{
ModuleName: moduleName,
EventType: events.ModuleEnabled,
})
}

return nil
}

// PushDeleteModule pushes moduleDelete task for a module into the main queue
// TODO: EXPERIMENTAL
/*func (mm *ModuleManager) PushDeleteModuleTask(moduleName string) {
// check if there is already moduleDelete task in the main queue for the module
if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) {
return
// If it's a new module - converges all modules
func (mm *ModuleManager) RegisterModule(ctx context.Context, moduleName string) error {
ctx, span := otel.Tracer(moduleManagerServiceName).Start(ctx, "RegisterModule")

Check failure on line 1199 in pkg/module_manager/module_manager.go

View workflow job for this annotation

GitHub Actions / Run Go linters

ineffectual assignment to ctx (ineffassign)
defer span.End()

span.SetAttributes(attribute.String("module", moduleName))

mm.logger.Debug("register the module", slog.String("module", moduleName))

if module := mm.GetModule(moduleName); module != nil {
mm.logger.Debug("unregister the module", slog.String("module", moduleName))

mm.DisableModuleHooks(moduleName)
mm.dependencies.HelmResourcesManager.StopMonitor(moduleName)
module.ResetState()
module.DeregisterHooks()
}

newTask := sh_task.NewTask(task.ModuleDelete).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: "ModuleManager-Delete-Module",
ModuleName: moduleName,
})
newTask.SetProp("triggered-by", "ModuleManager")
module, err := mm.moduleLoader.LoadModule(moduleName)
if err != nil {
return fmt.Errorf("load the module '%s': %w", moduleName, err)
}

mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
// load and registry global hooks
dep := &hooks.HookExecutionDependencyContainer{
HookMetricsStorage: mm.dependencies.HookMetricStorage,
KubeConfigManager: mm.dependencies.KubeConfigManager,
KubeObjectPatcher: mm.dependencies.KubeObjectPatcher,
MetricStorage: mm.dependencies.MetricStorage,
GlobalValuesGetter: mm.global,
}

log.Infof("Push ConvergeModules task because %q Module was disabled", moduleName)
mm.PushConvergeModulesTask(moduleName, "disabled")
}
module.WithDependencies(dep)

// PushConvergeModulesTask pushes ConvergeModulesTask into the main queue to update all modules on a module enable/disable event
// TODO: EXPERIMENTAL
func (mm *ModuleManager) PushConvergeModulesTask(moduleName, moduleState string) {
newConvergeTask := sh_task.NewTask(task.ConvergeModules).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: fmt.Sprintf("ModuleManager-%s-Module", moduleState),
ModuleName: moduleName,
}).
WithQueuedAt(time.Now())
newConvergeTask.SetProp("triggered-by", "ModuleManager")
newConvergeTask.SetProp(converge.ConvergeEventProp, converge.ReloadAllModules)
// add module to set
mm.modules.Add(module)

mm.dependencies.TaskQueues.GetMain().AddLast(newConvergeTask.WithQueuedAt(time.Now()))
}
// add module to scheduler
if err = mm.moduleScheduler.AddModuleVertex(module); err != nil {
return err
}

// queueHasPendingModuleDeleteTask returns true if queue has pending tasks
// with the type "ModuleDelete" related to the module "moduleName"
// TODO: EXPERIMENTAL
func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool {
if q == nil {
return false
// reinit scheduler
if err = mm.moduleScheduler.Initialize(); err != nil {
return fmt.Errorf("reinitialize module scheduler: %w", err)
}
modules := modulesWithPendingTasks(q, task.ModuleDelete)
meta, has := modules[moduleName]
return has && meta.doStartup
} */

mm.SendModuleEvent(events.ModuleEvent{
ModuleName: module.Name,
EventType: events.ModuleRegistered,
})

// send event to trigger reconverge
mm.SchedulerEventCh() <- extenders.ExtenderEvent{
ExtenderName: dynamic_extender.Name,
EncapsulatedEvent: dynamic_extender.DynamicExtenderEvent{},
}

return nil
}

// registerModules load all available modules from modules directory.
func (mm *ModuleManager) registerModules(scriptEnabledExtender *script_extender.Extender) error {
Expand Down
19 changes: 13 additions & 6 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Scheduler struct {
// provides a shared state of enabled modules to some extenders like script_enabled
vertexStateBuffer vertexStateBuffer

modules []node.ModuleInterface
modules map[string]node.ModuleInterface

logger *log.Logger
}
Expand All @@ -105,7 +105,7 @@ func NewScheduler(ctx context.Context, logger *log.Logger) *Scheduler {
dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic(), graph.PreventCycles()),
diff: make(map[string]bool),
vertexStateBuffer: vertexStateBuffer{},
modules: make([]node.ModuleInterface, 0),
modules: make(map[string]node.ModuleInterface),
logger: logger,
}
}
Expand Down Expand Up @@ -172,22 +172,29 @@ func (s *Scheduler) AddModuleVertex(module node.ModuleInterface) error {
weight = functionalWeight
}

if err := s.dag.RemoveVertex(module.GetName()); err != nil {
if !errors.Is(err, graph.ErrVertexNotFound) {
return fmt.Errorf("remove module vertex '%s': %w", module.GetName(), err)
}
}

vertex := node.NewNode().
WithName(module.GetName()).
WithWeight(weight).
WithType(node.ModuleType).
WithModule(module)

if err := s.dag.AddVertex(vertex,
err := s.dag.AddVertex(vertex,
graph.VertexAttribute("colorscheme", "greens3"),
graph.VertexAttribute("style", "filled"),
graph.VertexAttribute("color", "2"),
graph.VertexAttribute("fillcolor", "1"),
graph.VertexAttribute(node.TypeAttribute, string(node.ModuleType))); err != nil {
return err
graph.VertexAttribute(node.TypeAttribute, string(node.ModuleType)))
if err != nil {
return fmt.Errorf("add module vertex '%s': %w", module.GetName(), err)
}

s.modules = append(s.modules, module)
s.modules[module.GetName()] = module

return nil
}
Expand Down
Loading