Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,14 @@ func emitUpdateRunStatusMetric(updateRun *placementv1beta1.ClusterStagedUpdateRu
klog.V(2).InfoS("There's no valid status condition on updateRun, status updating failed possibly", "updateRun", klog.KObj(updateRun))
}

func removeWaitTimeFromUpdateRunStatus(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
func removeWaitTimeFromUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj) {
// Remove waitTime from the updateRun status for AfterStageTask for type Approval.
if updateRun.Status.UpdateStrategySnapshot != nil {
for i := range updateRun.Status.UpdateStrategySnapshot.Stages {
for j := range updateRun.Status.UpdateStrategySnapshot.Stages[i].AfterStageTasks {
if updateRun.Status.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].Type == placementv1beta1.AfterStageTaskTypeApproval {
updateRun.Status.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].WaitTime = nil
updateRunStatus := updateRun.GetUpdateRunStatus()
if updateRunStatus.UpdateStrategySnapshot != nil {
for i := range updateRunStatus.UpdateStrategySnapshot.Stages {
for j := range updateRunStatus.UpdateStrategySnapshot.Stages[i].AfterStageTasks {
if updateRunStatus.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].Type == placementv1beta1.AfterStageTaskTypeApproval {
updateRunStatus.UpdateStrategySnapshot.Stages[i].AfterStageTasks[j].WaitTime = nil
}
}
}
Expand Down
403 changes: 229 additions & 174 deletions pkg/controllers/updaterun/initialization.go

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions pkg/controllers/updaterun/initialization_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "parent clusterResourcePlacement not found")
validateFailedInitCondition(ctx, updateRun, "parent placement not found")
})

It("Should fail to initialize if CRP does not have external rollout strategy type", func() {
Expand All @@ -151,7 +151,7 @@ var _ = Describe("Updaterun initialization tests", func() {

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun,
"parent clusterResourcePlacement does not have an external rollout strategy")
"parent placement does not have an external rollout strategy")
})

It("Should copy the ApplyStrategy in the CRP to the UpdateRun", func() {
Expand Down Expand Up @@ -405,7 +405,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no scheduled or to-be-deleted clusterResourceBindings found")
validateFailedInitCondition(ctx, updateRun, "no scheduled or to-be-deleted bindings found")
})

It("Should fail to initialize if the number of selected bindings does not match the observed cluster count", func() {
Expand Down Expand Up @@ -505,7 +505,7 @@ var _ = Describe("Updaterun initialization tests", func() {

By("Validating the initialization not failed due to no selected cluster")
// it should fail due to strategy not found
validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
validateFailedInitCondition(ctx, updateRun, "referenced updateStrategy not found")
})

It("Should update the ObservedClusterCount to the number of scheduled bindings if it's pickAll policy", func() {
Expand All @@ -517,7 +517,7 @@ var _ = Describe("Updaterun initialization tests", func() {

By("Validating the initialization not failed due to no selected cluster")
// it should fail due to strategy not found
validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
validateFailedInitCondition(ctx, updateRun, "referenced updateStrategy not found")

By("Validating the ObservedClusterCount is updated")
Expect(updateRun.Status.PolicyObservedClusterCount).To(Equal(1), "failed to update the updateRun PolicyObservedClusterCount status")
Expand Down Expand Up @@ -565,7 +565,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
validateFailedInitCondition(ctx, updateRun, "referenced updateStrategy not found")
})

Context("Test computeRunStageStatus", func() {
Expand Down Expand Up @@ -777,7 +777,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no clusterResourceSnapshots with index `0` found")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand All @@ -792,7 +792,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no clusterResourceSnapshots with index `0` found")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand All @@ -807,7 +807,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no clusterResourceSnapshots with index `0` found")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand All @@ -822,7 +822,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no master clusterResourceSnapshot found for clusterResourcePlacement")
validateFailedInitCondition(ctx, updateRun, "no master resourceSnapshot found for placement")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand Down
28 changes: 20 additions & 8 deletions pkg/controllers/updaterun/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (r *Reconciler) validate(
updateRunCopy := updateRun.DeepCopy()
klog.V(2).InfoS("Start to validate the clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)

// Validate the ClusterResourcePlacement object referenced by the ClusterStagedUpdateRun.
placementName, err := r.validateCRP(ctx, updateRunCopy)
// Validate the Placement object referenced by the UpdateRun.
placementName, err := r.validatePlacement(ctx, updateRunCopy)
if err != nil {
return -1, nil, nil, err
}
Expand Down Expand Up @@ -85,7 +85,18 @@ func (r *Reconciler) validate(
if err != nil {
return -1, nil, nil, err
}
return updatingStageIndex, scheduledBindings, toBeDeletedBindings, nil

// TODO (arvindth): remove this conversion step after refactoring other functions to use interface types.
concreteScheduledBindings, err := controller.ConvertBindingObjsToConcreteCRBArray(scheduledBindings)
if err != nil {
return -1, nil, nil, err
}
concreateToBeDeletedBindings, err := controller.ConvertBindingObjsToConcreteCRBArray(toBeDeletedBindings)
if err != nil {
return -1, nil, nil, err
}

return updatingStageIndex, concreteScheduledBindings, concreateToBeDeletedBindings, nil
}

// validateStagesStatus validates both the update and delete stages of the ClusterStagedUpdateRun.
Expand All @@ -95,7 +106,7 @@ func (r *Reconciler) validate(
// If the updating stage index is len(updateRun.Status.StagesStatus), the next stage to be updated will be the delete stage.
func (r *Reconciler) validateStagesStatus(
ctx context.Context,
scheduledBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
scheduledBindings, toBeDeletedBindings []placementv1beta1.BindingObj,
updateRun, updateRunCopy *placementv1beta1.ClusterStagedUpdateRun,
) (int, error) {
updateRunRef := klog.KObj(updateRun)
Expand Down Expand Up @@ -256,7 +267,7 @@ func validateClusterUpdatingStatus(
// It returns the updating stage index, or any error encountered.
func validateDeleteStageStatus(
updatingStageIndex, lastFinishedStageIndex, totalStages int,
toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
toBeDeletedBindings []placementv1beta1.BindingObj,
updateRun *placementv1beta1.ClusterStagedUpdateRun,
) (int, error) {
updateRunRef := klog.KObj(updateRun)
Expand All @@ -275,9 +286,10 @@ func validateDeleteStageStatus(
deletingClusterMap[cluster.ClusterName] = struct{}{}
}
for _, binding := range toBeDeletedBindings {
if _, ok := deletingClusterMap[binding.Spec.TargetCluster]; !ok {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` to be deleted is not in the delete stage", binding.Spec.TargetCluster))
klog.ErrorS(unexpectedErr, "Detect new cluster to be unscheduled", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
bindingSpec := binding.GetBindingSpec()
if _, ok := deletingClusterMap[bindingSpec.TargetCluster]; !ok {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` to be deleted is not in the delete stage", bindingSpec.TargetCluster))
klog.ErrorS(unexpectedErr, "Detect new cluster to be unscheduled", "binding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/updaterun/validation_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ var _ = Describe("UpdateRun validation tests", func() {

By("Validating the validation failed")
wantStatus = generateFailedValidationStatus(updateRun, wantStatus)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "parent clusterResourcePlacement not found")
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "parent placement not found")
})

It("Should fail to validate if CRP does not have external rollout strategy type", func() {
Expand All @@ -183,7 +183,7 @@ var _ = Describe("UpdateRun validation tests", func() {
By("Validating the validation failed")
wantStatus = generateFailedValidationStatus(updateRun, wantStatus)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus,
"parent clusterResourcePlacement does not have an external rollout strategy")
"parent placement does not have an external rollout strategy")
})

It("Should fail to valdiate if the ApplyStrategy in the CRP has changed", func() {
Expand Down
16 changes: 8 additions & 8 deletions pkg/controllers/updaterun/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestValidateDeleteStageStatus(t *testing.T) {
name string
updatingStageIndex int
lastFinishedStageIndex int
toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
toBeDeletedBindings []placementv1beta1.BindingObj
deleteStageStatus *placementv1beta1.StageUpdatingStatus
wantErr error
wantUpdatingStageIndex int
Expand All @@ -257,12 +257,12 @@ func TestValidateDeleteStageStatus(t *testing.T) {
},
{
name: "validateDeleteStageStatus should return error if there's new to-be-deleted bindings",
toBeDeletedBindings: []*placementv1beta1.ClusterResourceBinding{
{
toBeDeletedBindings: []placementv1beta1.BindingObj{
&placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "binding-1"},
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-1"},
},
{
&placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "binding-2"},
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-2"},
},
Expand All @@ -280,8 +280,8 @@ func TestValidateDeleteStageStatus(t *testing.T) {
name: "validateDeleteStageStatus should not return error if there's fewer to-be-deleted bindings",
updatingStageIndex: -1,
lastFinishedStageIndex: -1,
toBeDeletedBindings: []*placementv1beta1.ClusterResourceBinding{
{
toBeDeletedBindings: []placementv1beta1.BindingObj{
&placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "binding-1"},
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-1"},
},
Expand All @@ -300,8 +300,8 @@ func TestValidateDeleteStageStatus(t *testing.T) {
name: "validateDeleteStageStatus should not return error if there are equal to-be-deleted bindings",
updatingStageIndex: -1,
lastFinishedStageIndex: -1,
toBeDeletedBindings: []*placementv1beta1.ClusterResourceBinding{
{
toBeDeletedBindings: []placementv1beta1.BindingObj{
&placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "binding-1"},
Spec: placementv1beta1.ResourceBindingSpec{TargetCluster: "cluster-1"},
},
Expand Down
16 changes: 16 additions & 0 deletions pkg/utils/controller/binding_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -122,3 +123,18 @@ func ConvertRBArrayToBindingObjs(bindings []*placementv1beta1.ResourceBinding) [
}
return result
}

// TODO(arvindth): remove method once we have updated updateRun controller to use interfaces.
// ConvertBindingObjsToConcreteCRBArray converts a slice of BindingObj interfaces to a slice of concrete ClusterResourceBinding pointers.
func ConvertBindingObjsToConcreteCRBArray(bindingObjs []placementv1beta1.BindingObj) ([]*placementv1beta1.ClusterResourceBinding, error) {
bindings := make([]*placementv1beta1.ClusterResourceBinding, len(bindingObjs))

for i, bindingObj := range bindingObjs {
if crb, ok := bindingObj.(*placementv1beta1.ClusterResourceBinding); ok {
bindings[i] = crb
} else {
return nil, fmt.Errorf("expected ClusterResourceBinding but got %T - initialize function needs further refactoring for namespace-scoped resources", bindingObj)
}
}
return bindings, nil
}
22 changes: 16 additions & 6 deletions pkg/utils/controller/placement_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,22 @@ func FetchPlacementFromKey(ctx context.Context, c client.Reader, placementKey qu
if err != nil {
return nil, err
}
return FetchPlacementFromNamespacedName(ctx, c, types.NamespacedName{Namespace: namespace, Name: name})
}

// FetchPlacementFromNamespacedName resolves a NamespacedName to a concrete placement object that implements PlacementObj.
func FetchPlacementFromNamespacedName(ctx context.Context, c client.Reader, nn types.NamespacedName) (fleetv1beta1.PlacementObj, error) {
// Check if the key contains a namespace separator
var placement fleetv1beta1.PlacementObj
if namespace != "" {
if nn.Namespace != "" {
// This is a namespaced ResourcePlacement
placement = &fleetv1beta1.ResourcePlacement{}
} else {
// This is a cluster-scoped ClusterResourcePlacement
placement = &fleetv1beta1.ClusterResourcePlacement{}
}
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
if err := c.Get(ctx, key, placement); err != nil {

if err := c.Get(ctx, nn, placement); err != nil {
return nil, err
}
return placement, nil
Expand Down Expand Up @@ -94,6 +96,14 @@ func GetObjectKeyFromNamespaceName(namespace, name string) string {
}
}

// GetNamespacedNameFromObject generates a NamespacedName from a meta object.
func GetNamespacedNameFromObject(obj metav1.Object) types.NamespacedName {
return types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
}

// ExtractNamespaceNameFromKey resolves a PlacementKey to a (namespace, name) tuple of the placement object.
func ExtractNamespaceNameFromKey(key queue.PlacementKey) (string, string, error) {
return ExtractNamespaceNameFromKeyStr(string(key))
Expand Down
45 changes: 45 additions & 0 deletions pkg/utils/controller/strategy_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2025 The KubeFleet Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
)

// FetchUpdateStrategyFromNamespacedName resolves a NamespacedName to a concrete staged update strategy object that implements UpdateStrategyObj.
// If Namespace is empty, it fetches a ClusterStagedUpdateStrategy (cluster-scoped).
// If Namespace is not empty, it fetches a StagedUpdateStrategy (namespace-scoped).
func FetchUpdateStrategyFromNamespacedName(ctx context.Context, c client.Reader, strategyKey types.NamespacedName) (placementv1beta1.UpdateStrategyObj, error) {
var strategy placementv1beta1.UpdateStrategyObj
// If Namespace is empty, it's a cluster-scoped strategy (ClusterStagedUpdateStrategy)
if strategyKey.Namespace == "" {
strategy = &placementv1beta1.ClusterStagedUpdateStrategy{}
} else {
// Otherwise, it's a namespaced strategy (StagedUpdateStrategy)
strategy = &placementv1beta1.StagedUpdateStrategy{}
}
err := c.Get(ctx, strategyKey, strategy)
if err != nil {
return nil, err
}
return strategy, nil
}
Loading
Loading