Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions source/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

kubeinformers "k8s.io/client-go/informers"
Expand Down Expand Up @@ -76,6 +77,40 @@ func NewPodSource(
}

_, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler())

if fqdnTemplate == "" {
// Transformer is used to reduce the memory usage of the informer.
// The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
// If watchList is not used it will not prevent memory bursts on the initial informer sync.
// When fqdnTemplate is used the entire pod needs to be provided to the rendering call, but the informer itself becomes unneeded.
podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
pod, ok := i.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a pod")
}
if pod.UID == "" {
// Pod was already transformed and we must be idempotent.
return pod, nil
}
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
// Name/namespace must always be kept for the informer to work.
Name: pod.Name,
Namespace: pod.Namespace,
// Used by the controller. This includes non-external-dns prefixed annotations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth to rebase with master branch. This most likely not going to behave correctly in cases when

  1. annotationFilter is present
  2. labelSelector is present

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rebase and assess what we need to store

Annotations: pod.Annotations,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need annotation kubectl.kubernetes.io/last-applied-configuration? Just a question, not a big deal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good remark, we could at least drop this one. We don't use it internally so had no impact but definitely a potential memory horder for people using apply.
Will add this

},
Spec: corev1.PodSpec{
HostNetwork: pod.Spec.HostNetwork,
NodeName: pod.Spec.NodeName,
},
Status: corev1.PodStatus{
PodIP: pod.Status.PodIP,
},
}, nil
})
}

_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())

informerFactory.Start(ctx.Done())
Expand Down
145 changes: 145 additions & 0 deletions source/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -997,3 +998,147 @@ func nodesFixturesIPv4() []*corev1.Node {
},
}
}

func TestPodTransformerInPodSource(t *testing.T) {
t.Run("transformer set", func(t *testing.T) {
ctx := t.Context()
fakeClient := fake.NewClientset()

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "test",
}},
Hostname: "test-hostname",
NodeName: "test-node",
HostNetwork: true,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-name",
Labels: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
Annotations: map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
},
UID: "someuid",
},
Status: v1.PodStatus{
PodIP: "127.0.0.1",
HostIP: "127.0.0.2",
Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}},
},
}

_, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

// Should not error when creating the source
src, err := NewPodSource(ctx, fakeClient, "", "", false, "", "", false, "", nil)
require.NoError(t, err)
ps, ok := src.(*podSource)
require.True(t, ok)

retrieved, err := ps.podInformer.Lister().Pods("test-ns").Get("test-name")
require.NoError(t, err)

// Metadata
assert.Equal(t, "test-name", retrieved.Name)
assert.Equal(t, "test-ns", retrieved.Namespace)
assert.Empty(t, retrieved.UID)
assert.Empty(t, retrieved.Labels)
// Filtered
assert.Equal(t, map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
}, retrieved.Annotations)

// Spec
assert.Empty(t, retrieved.Spec.Containers)
assert.Empty(t, retrieved.Spec.Hostname)
assert.Equal(t, "test-node", retrieved.Spec.NodeName)
assert.True(t, retrieved.Spec.HostNetwork)

// Status
assert.Empty(t, retrieved.Status.ContainerStatuses)
assert.Empty(t, retrieved.Status.InitContainerStatuses)
assert.Empty(t, retrieved.Status.HostIP)
assert.Equal(t, "127.0.0.1", retrieved.Status.PodIP)
assert.Empty(t, retrieved.Status.Conditions)
})

t.Run("transformer is not used when fqdnTemplate is set", func(t *testing.T) {
ctx := t.Context()
fakeClient := fake.NewClientset()

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "test",
}},
Hostname: "test-hostname",
NodeName: "test-node",
HostNetwork: true,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-name",
Labels: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
Annotations: map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
},
UID: "someuid",
},
Status: v1.PodStatus{
PodIP: "127.0.0.1",
HostIP: "127.0.0.2",
Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}},
},
}

_, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

// Should not error when creating the source
src, err := NewPodSource(ctx, fakeClient, "", "", false, "", "template", false, "", nil)
require.NoError(t, err)
ps, ok := src.(*podSource)
require.True(t, ok)

retrieved, err := ps.podInformer.Lister().Pods("test-ns").Get("test-name")
require.NoError(t, err)

// Metadata
assert.Equal(t, "test-name", retrieved.Name)
assert.Equal(t, "test-ns", retrieved.Namespace)
assert.NotEmpty(t, retrieved.UID)
assert.NotEmpty(t, retrieved.Labels)
})
}
43 changes: 43 additions & 0 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
Expand Down Expand Up @@ -134,6 +135,48 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
if err != nil {
return nil, err
}

// Transformer is used to reduce the memory usage of the informer.
// The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
// If watchList is not used it will not prevent memory bursts on the initial informer sync.
podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
pod, ok := i.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a pod")
}
if pod.UID == "" {
// Pod was already transformed and we must be idempotent.
return pod, nil
}

// All pod level annotations we're interested in start with a common prefix
podAnnotations := map[string]string{}
for key, value := range pod.Annotations {
if strings.HasPrefix(key, annotations.AnnotationKeyPrefix) {
podAnnotations[key] = value
}
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
// Name/namespace must always be kept for the informer to work.
Name: pod.Name,
Namespace: pod.Namespace,
// Used to match services.
Labels: pod.Labels,
Annotations: podAnnotations,
DeletionTimestamp: pod.DeletionTimestamp,
},
Spec: v1.PodSpec{
Hostname: pod.Spec.Hostname,
NodeName: pod.Spec.NodeName,
},
Status: v1.PodStatus{
HostIP: pod.Status.HostIP,
Phase: pod.Status.Phase,
Conditions: pod.Status.Conditions,
},
}, nil
})
}

var nodeInformer coreinformers.NodeInformer
Expand Down
104 changes: 104 additions & 0 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4702,6 +4702,110 @@ func TestEndpointSlicesIndexer(t *testing.T) {
})
}

func TestPodTransformerInServiceSource(t *testing.T) {
ctx := t.Context()
fakeClient := fake.NewClientset()

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "test",
}},
Hostname: "test-hostname",
NodeName: "test-node",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-name",
Labels: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
Annotations: map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
},
UID: "someuid",
},
Status: v1.PodStatus{
PodIP: "127.0.0.1",
HostIP: "127.0.0.2",
Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}},
},
}

_, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

// Should not error when creating the source
src, err := NewServiceSource(
ctx,
fakeClient,
"",
"",
"{{.Name}}",
false,
"",
false,
false,
false,
[]string{},
false,
labels.Everything(),
false,
false,
false,
)
require.NoError(t, err)
ss, ok := src.(*serviceSource)
require.True(t, ok)

retrieved, err := ss.podInformer.Lister().Pods("test-ns").Get("test-name")
require.NoError(t, err)

// Metadata
assert.Equal(t, "test-name", retrieved.Name)
assert.Equal(t, "test-ns", retrieved.Namespace)
assert.Empty(t, retrieved.UID)
assert.Equal(t, map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
}, retrieved.Labels)
// Filtered
assert.Equal(t, map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
}, retrieved.Annotations)

// Spec
assert.Empty(t, retrieved.Spec.Containers)
assert.Equal(t, "test-hostname", retrieved.Spec.Hostname)
assert.Equal(t, "test-node", retrieved.Spec.NodeName)

// Status
assert.Empty(t, retrieved.Status.ContainerStatuses)
assert.Empty(t, retrieved.Status.InitContainerStatuses)
assert.Equal(t, "127.0.0.2", retrieved.Status.HostIP)
assert.Empty(t, retrieved.Status.PodIP)
assert.ElementsMatch(t, []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}}, retrieved.Status.Conditions)
}

// createTestServicesByType creates the requested number of services per type in the given namespace.
func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]int) []*v1.Service {
var services []*v1.Service
Expand Down
Loading