Skip to content

Commit dd2ba4e

Browse files
ivankatliarchuktroll-os
authored andcommitted
refactor(source/istio): add transformers (kubernetes-sigs#5728)
* chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <[email protected]> * chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <[email protected]> * chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <[email protected]> * chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <[email protected]> --------- Signed-off-by: ivan katliarchuk <[email protected]>
1 parent caaa037 commit dd2ba4e

File tree

7 files changed

+528
-44
lines changed

7 files changed

+528
-44
lines changed

source/informers/fake.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ package informers
1515

1616
import (
1717
"github.com/stretchr/testify/mock"
18+
corev1 "k8s.io/api/core/v1"
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/util/intstr"
1821
corev1lister "k8s.io/client-go/listers/core/v1"
1922
discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
2023
"k8s.io/client-go/tools/cache"
@@ -58,3 +61,50 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer {
5861
func (f *FakeNodeInformer) Lister() corev1lister.NodeLister {
5962
return corev1lister.NewNodeLister(f.Informer().GetIndexer())
6063
}
64+
65+
func fakeService() *corev1.Service {
66+
return &corev1.Service{
67+
ObjectMeta: metav1.ObjectMeta{
68+
Name: "fake-service",
69+
Namespace: "ns",
70+
Labels: map[string]string{"env": "prod", "team": "devops"},
71+
Annotations: map[string]string{"description": "Enriched service object"},
72+
UID: "1234",
73+
},
74+
Spec: corev1.ServiceSpec{
75+
Selector: map[string]string{"app": "demo"},
76+
ExternalIPs: []string{"1.2.3.4"},
77+
Ports: []corev1.ServicePort{
78+
{
79+
Name: "http",
80+
Port: 80,
81+
TargetPort: intstr.FromInt32(8080),
82+
Protocol: corev1.ProtocolTCP,
83+
},
84+
{
85+
Name: "https",
86+
Port: 443,
87+
TargetPort: intstr.FromInt32(8443),
88+
Protocol: corev1.ProtocolTCP,
89+
},
90+
},
91+
Type: corev1.ServiceTypeLoadBalancer,
92+
},
93+
Status: corev1.ServiceStatus{
94+
LoadBalancer: corev1.LoadBalancerStatus{
95+
Ingress: []corev1.LoadBalancerIngress{
96+
{IP: "5.6.7.8", Hostname: "lb.example.com"},
97+
},
98+
},
99+
Conditions: []metav1.Condition{
100+
{
101+
Type: "Available",
102+
Status: metav1.ConditionTrue,
103+
Reason: "MinimumReplicasAvailable",
104+
Message: "Service is available",
105+
LastTransitionTime: metav1.Now(),
106+
},
107+
},
108+
},
109+
}
110+
}

source/informers/transfomers.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package informers
15+
16+
import (
17+
corev1 "k8s.io/api/core/v1"
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/client-go/tools/cache"
20+
)
21+
22+
type TransformOptions struct {
23+
specSelector bool
24+
specExternalIps bool
25+
statusLb bool
26+
}
27+
28+
func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc {
29+
options := TransformOptions{}
30+
for _, fn := range optFns {
31+
fn(&options)
32+
}
33+
return func(obj any) (any, error) {
34+
// only transform if the object is a Service at the moment
35+
entity, ok := obj.(*corev1.Service)
36+
if !ok {
37+
return nil, nil
38+
}
39+
if entity.UID == "" {
40+
// Pod was already transformed and we must be idempotent.
41+
return entity, nil
42+
}
43+
svc := &corev1.Service{
44+
ObjectMeta: metav1.ObjectMeta{
45+
Name: entity.Name,
46+
Namespace: entity.Namespace,
47+
DeletionTimestamp: entity.DeletionTimestamp,
48+
},
49+
Spec: corev1.ServiceSpec{},
50+
Status: corev1.ServiceStatus{},
51+
}
52+
if options.specSelector {
53+
svc.Spec.Selector = entity.Spec.Selector
54+
}
55+
if options.specExternalIps {
56+
svc.Spec.ExternalIPs = entity.Spec.ExternalIPs
57+
}
58+
if options.statusLb {
59+
svc.Status.LoadBalancer = entity.Status.LoadBalancer
60+
}
61+
return svc, nil
62+
}
63+
}
64+
65+
// TransformWithSpecSelector enables copying the Service's .spec.selector field.
66+
func TransformWithSpecSelector() func(options *TransformOptions) {
67+
return func(options *TransformOptions) {
68+
options.specSelector = true
69+
}
70+
}
71+
72+
// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field.
73+
func TransformWithSpecExternalIPs() func(options *TransformOptions) {
74+
return func(options *TransformOptions) {
75+
options.specExternalIps = true
76+
}
77+
}
78+
79+
// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field.
80+
func TransformWithStatusLoadBalancer() func(options *TransformOptions) {
81+
return func(options *TransformOptions) {
82+
options.statusLb = true
83+
}
84+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package informers
15+
16+
import (
17+
"testing"
18+
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
21+
corev1 "k8s.io/api/core/v1"
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
kubeinformers "k8s.io/client-go/informers"
24+
"k8s.io/client-go/kubernetes/fake"
25+
)
26+
27+
func TestTransformerWithOptions_Service(t *testing.T) {
28+
base := fakeService()
29+
30+
tests := []struct {
31+
name string
32+
options []func(*TransformOptions)
33+
asserts func(any)
34+
}{
35+
{
36+
name: "minimalistic object",
37+
options: nil,
38+
asserts: func(obj any) {
39+
svc, ok := obj.(*corev1.Service)
40+
assert.True(t, ok)
41+
assert.Empty(t, svc.UID)
42+
assert.NotEmpty(t, svc.Name)
43+
assert.NotEmpty(t, svc.Namespace)
44+
},
45+
},
46+
{
47+
name: "with selector",
48+
options: []func(*TransformOptions){TransformWithSpecSelector()},
49+
asserts: func(obj any) {
50+
svc, ok := obj.(*corev1.Service)
51+
assert.True(t, ok)
52+
assert.NotEmpty(t, svc.Spec.Selector)
53+
assert.Empty(t, svc.Spec.ExternalIPs)
54+
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
55+
},
56+
},
57+
{
58+
name: "with selector",
59+
options: []func(*TransformOptions){TransformWithSpecSelector()},
60+
asserts: func(obj any) {
61+
svc, ok := obj.(*corev1.Service)
62+
assert.True(t, ok)
63+
assert.NotEmpty(t, svc.Spec.Selector)
64+
assert.Empty(t, svc.Spec.ExternalIPs)
65+
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
66+
},
67+
},
68+
{
69+
name: "with loadBalancer",
70+
options: []func(*TransformOptions){TransformWithStatusLoadBalancer()},
71+
asserts: func(obj any) {
72+
svc, ok := obj.(*corev1.Service)
73+
assert.True(t, ok)
74+
assert.Empty(t, svc.Spec.Selector)
75+
assert.Empty(t, svc.Spec.ExternalIPs)
76+
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
77+
},
78+
},
79+
{
80+
name: "all options",
81+
options: []func(*TransformOptions){
82+
TransformWithSpecSelector(),
83+
TransformWithSpecExternalIPs(),
84+
TransformWithStatusLoadBalancer(),
85+
},
86+
asserts: func(obj any) {
87+
svc, ok := obj.(*corev1.Service)
88+
assert.True(t, ok)
89+
assert.NotEmpty(t, svc.Spec.Selector)
90+
assert.NotEmpty(t, svc.Spec.ExternalIPs)
91+
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
92+
},
93+
},
94+
}
95+
96+
for _, tt := range tests {
97+
t.Run(tt.name, func(t *testing.T) {
98+
transform := TransformerWithOptions[*corev1.Service](tt.options...)
99+
got, err := transform(base)
100+
require.NoError(t, err)
101+
tt.asserts(got)
102+
})
103+
}
104+
105+
t.Run("non-service input", func(t *testing.T) {
106+
transform := TransformerWithOptions[*corev1.Service]()
107+
out, err := transform("not-a-service")
108+
if err != nil {
109+
t.Fatalf("unexpected error: %v", err)
110+
}
111+
if out != nil {
112+
t.Errorf("expected nil output for non-service input, got %v", out)
113+
}
114+
})
115+
}
116+
117+
func TestTransformer_Service_WithFakeClient(t *testing.T) {
118+
t.Run("with transformer", func(t *testing.T) {
119+
ctx := t.Context()
120+
svc := fakeService()
121+
fakeClient := fake.NewClientset()
122+
123+
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
124+
require.NoError(t, err)
125+
126+
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
127+
serviceInformer := factory.Core().V1().Services()
128+
err = serviceInformer.Informer().SetTransform(TransformerWithOptions[*corev1.Service](
129+
TransformWithSpecSelector(),
130+
TransformWithSpecExternalIPs(),
131+
TransformWithStatusLoadBalancer(),
132+
))
133+
require.NoError(t, err)
134+
135+
factory.Start(ctx.Done())
136+
err = WaitForCacheSync(ctx, factory)
137+
require.NoError(t, err)
138+
139+
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
140+
require.NoError(t, err)
141+
142+
assert.Equal(t, svc.Spec.Selector, got.Spec.Selector)
143+
assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs)
144+
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
145+
assert.NotEqual(t, svc.Annotations, got.Annotations)
146+
assert.NotEqual(t, svc.Labels, got.Labels)
147+
})
148+
149+
t.Run("without transformer", func(t *testing.T) {
150+
ctx := t.Context()
151+
svc := fakeService()
152+
fakeClient := fake.NewClientset()
153+
154+
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
155+
require.NoError(t, err)
156+
157+
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
158+
serviceInformer := factory.Core().V1().Services()
159+
160+
err = serviceInformer.Informer().GetIndexer().Add(svc)
161+
require.NoError(t, err)
162+
163+
factory.Start(ctx.Done())
164+
err = WaitForCacheSync(ctx, factory)
165+
require.NoError(t, err)
166+
167+
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
168+
require.NoError(t, err)
169+
170+
assert.Equal(t, map[string]string{"app": "demo"}, got.Spec.Selector)
171+
assert.Equal(t, []string{"1.2.3.4"}, got.Spec.ExternalIPs)
172+
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
173+
assert.Equal(t, svc.Annotations, got.Annotations)
174+
assert.Equal(t, svc.Labels, got.Labels)
175+
})
176+
}

source/istio_gateway.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import (
2828
istioclient "istio.io/client-go/pkg/clientset/versioned"
2929
istioinformers "istio.io/client-go/pkg/informers/externalversions"
3030
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
31+
corev1 "k8s.io/api/core/v1"
3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/labels"
3334
kubeinformers "k8s.io/client-go/informers"
3435
coreinformers "k8s.io/client-go/informers/core/v1"
3536
"k8s.io/client-go/kubernetes"
36-
"k8s.io/client-go/tools/cache"
3737

3838
"sigs.k8s.io/external-dns/endpoint"
3939
"sigs.k8s.io/external-dns/source/annotations"
@@ -84,30 +84,26 @@ func NewIstioGatewaySource(
8484
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
8585

8686
// Add default resource event handlers to properly initialize informer.
87-
_, _ = serviceInformer.Informer().AddEventHandler(
88-
cache.ResourceEventHandlerFuncs{
89-
AddFunc: func(obj interface{}) {
90-
log.Debug("service added")
91-
},
92-
},
93-
)
94-
95-
_, _ = gatewayInformer.Informer().AddEventHandler(
96-
cache.ResourceEventHandlerFuncs{
97-
AddFunc: func(obj interface{}) {
98-
log.Debug("gateway added")
99-
},
100-
},
101-
)
87+
_, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
88+
err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
89+
informers.TransformWithSpecSelector(),
90+
informers.TransformWithSpecExternalIPs(),
91+
informers.TransformWithStatusLoadBalancer(),
92+
))
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
_, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
10298

10399
informerFactory.Start(ctx.Done())
104100
istioInformerFactory.Start(ctx.Done())
105101

106102
// wait for the local cache to be populated.
107-
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
103+
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
108104
return nil, err
109105
}
110-
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
106+
if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
111107
return nil, err
112108
}
113109

0 commit comments

Comments
 (0)