Skip to content

Commit 9556b6a

Browse files
authored
Use LinearCache to optimize StreamEndpoint discovery. (#6906)
Signed-off-by: Tero Saarni <[email protected]>
1 parent f3c4fbf commit 9556b6a

File tree

3 files changed

+39
-29
lines changed

3 files changed

+39
-29
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve performance in clusters with a large number of endpoints by using go-control-plane LinearCache for EDS.

internal/xdscache/v3/endpointslicetranslator.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -339,25 +339,10 @@ func (e *EndpointSliceTranslator) OnChange(root *dag.DAG) {
339339
// be removed. Since we reset the cluster cache above, all
340340
// the load assignments will be recalculated and we can just
341341
// set the entries rather than merging them.
342-
entries := e.cache.Recalculate()
342+
e.entries = e.cache.Recalculate()
343343

344-
// Only update and notify if entries has changed.
345-
changed := false
346-
347-
e.mu.Lock()
348-
if !equal(e.entries, entries) {
349-
e.entries = entries
350-
changed = true
351-
}
352-
e.mu.Unlock()
353-
354-
if changed {
355-
e.Debug("cluster load assignments changed, notifying waiters")
356-
if e.Observer != nil {
357-
e.Observer.Refresh()
358-
}
359-
} else {
360-
e.Debug("cluster load assignments did not change")
344+
if e.Observer != nil {
345+
e.Observer.Refresh()
361346
}
362347
}
363348

internal/xdscache/v3/snapshot.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
type SnapshotHandler struct {
3636
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
3737
defaultCache envoy_cache_v3.SnapshotCache
38-
edsCache envoy_cache_v3.SnapshotCache
38+
edsCache *envoy_cache_v3.LinearCache
3939
mux *envoy_cache_v3.MuxCache
4040
log logrus.FieldLogger
4141
}
@@ -44,7 +44,12 @@ type SnapshotHandler struct {
4444
func NewSnapshotHandler(resources []xdscache.ResourceCache, log logrus.FieldLogger) *SnapshotHandler {
4545
var (
4646
defaultCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "defaultCache"))
47-
edsCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "edsCache"))
47+
// Envoy will open EDS stream per CDS entry.
48+
// LinearCache mitigates the issue where all EDS streams are notified of any endpoint changes, even if unrelated to the requested resources.
49+
// With LinearCache, updates are sent only for resources explicitly requested by Envoy.
50+
edsCache = envoy_cache_v3.NewLinearCache(envoy_resource_v3.EndpointType,
51+
envoy_cache_v3.WithVersionPrefix(uuid.NewString()+"-"),
52+
envoy_cache_v3.WithLogger(log.WithField("context", "edsCache")))
4853

4954
mux = &envoy_cache_v3.MuxCache{
5055
Caches: map[string]envoy_cache_v3.Cache{},
@@ -89,21 +94,40 @@ func (s *SnapshotHandler) GetCache() envoy_cache_v3.Cache {
8994
// Refresh is called when the EndpointSliceTranslator updates values
9095
// in its cache. It updates the EDS cache.
9196
func (s *SnapshotHandler) Refresh() {
92-
version := uuid.NewString()
97+
previouslyNotifiedResources := s.edsCache.GetResources()
98+
currentResources := envoy_cache_v3.IndexRawResourcesByName(asResources(s.resources[envoy_resource_v3.EndpointType].Contents()))
99+
100+
toUpdate := make(map[string]envoy_types.Resource, len(currentResources))
101+
toDelete := make([]string, 0, len(previouslyNotifiedResources))
102+
103+
for resourceName, previousResource := range previouslyNotifiedResources {
104+
if newResource, ok := currentResources[resourceName]; ok {
105+
// Add resources that were updated.
106+
if !proto.Equal(newResource, previousResource) {
107+
toUpdate[resourceName] = newResource
108+
}
109+
} else {
110+
// Add resources that were deleted.
111+
toDelete = append(toDelete, resourceName)
112+
}
113+
}
93114

94-
resources := map[envoy_resource_v3.Type][]envoy_types.Resource{
95-
envoy_resource_v3.EndpointType: asResources(s.resources[envoy_resource_v3.EndpointType].Contents()),
115+
// Add resources that are new.
116+
for resourceName, newResource := range currentResources {
117+
if _, ok := previouslyNotifiedResources[resourceName]; !ok {
118+
toUpdate[resourceName] = newResource
119+
}
96120
}
97121

98-
snapshot, err := envoy_cache_v3.NewSnapshot(version, resources)
99-
if err != nil {
100-
s.log.Errorf("failed to generate snapshot version %q: %s", version, err)
122+
if len(toUpdate) == 0 && len(toDelete) == 0 {
123+
s.log.Debug("no EDS resources to update")
101124
return
102125
}
103126

104-
if err := s.edsCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
105-
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
106-
return
127+
s.log.WithField("toUpdate", len(toUpdate)).WithField("toDelete", len(toDelete)).Debug("refreshing EDS cache")
128+
129+
if err := s.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
130+
s.log.WithError(err).Error("failed to update EDS cache")
107131
}
108132
}
109133

0 commit comments

Comments
 (0)