Skip to content

Commit 7bc7a34

Browse files
committed
Use LinearCache to optimize StreamEndpoint discovery.
Signed-off-by: Tero Saarni <[email protected]>
1 parent 38346c5 commit 7bc7a34

File tree

4 files changed

+39
-47
lines changed

4 files changed

+39
-47
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve performance in clusters with a large number of endpoints by using go-control-plane LinearCache for EDS.

internal/xdscache/v3/endpointslicetranslator.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -334,25 +334,10 @@ func (e *EndpointSliceTranslator) OnChange(root *dag.DAG) {
334334
// be removed. Since we reset the cluster cache above, all
335335
// the load assignments will be recalculated and we can just
336336
// set the entries rather than merging them.
337-
entries := e.cache.Recalculate()
337+
e.entries = e.cache.Recalculate()
338338

339-
// Only update and notify if entries has changed.
340-
changed := false
341-
342-
e.mu.Lock()
343-
if !equal(e.entries, entries) {
344-
e.entries = entries
345-
changed = true
346-
}
347-
e.mu.Unlock()
348-
349-
if changed {
350-
e.Debug("cluster load assignments changed, notifying waiters")
351-
if e.Observer != nil {
352-
e.Observer.Refresh()
353-
}
354-
} else {
355-
e.Debug("cluster load assignments did not change")
339+
if e.Observer != nil {
340+
e.Observer.Refresh()
356341
}
357342
}
358343

internal/xdscache/v3/endpointstranslator.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -309,25 +309,10 @@ func (e *EndpointsTranslator) OnChange(root *dag.DAG) {
309309
// be removed. Since we reset the cluster cache above, all
310310
// the load assignments will be recalculated and we can just
311311
// set the entries rather than merging them.
312-
entries := e.cache.Recalculate()
312+
e.entries = e.cache.Recalculate()
313313

314-
// Only update and notify if entries has changed.
315-
changed := false
316-
317-
e.mu.Lock()
318-
if !equal(e.entries, entries) {
319-
e.entries = entries
320-
changed = true
321-
}
322-
e.mu.Unlock()
323-
324-
if changed {
325-
e.Debug("cluster load assignments changed, notifying waiters")
326-
if e.Observer != nil {
327-
e.Observer.Refresh()
328-
}
329-
} else {
330-
e.Debug("cluster load assignments did not change")
314+
if e.Observer != nil {
315+
e.Observer.Refresh()
331316
}
332317
}
333318

internal/xdscache/v3/snapshot.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
type SnapshotHandler struct {
3636
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
3737
defaultCache envoy_cache_v3.SnapshotCache
38-
edsCache envoy_cache_v3.SnapshotCache
38+
edsCache *envoy_cache_v3.LinearCache
3939
mux *envoy_cache_v3.MuxCache
4040
log logrus.FieldLogger
4141
}
@@ -44,7 +44,9 @@ type SnapshotHandler struct {
4444
func NewSnapshotHandler(resources []xdscache.ResourceCache, log logrus.FieldLogger) *SnapshotHandler {
4545
var (
4646
defaultCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "defaultCache"))
47-
edsCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "edsCache"))
47+
edsCache = envoy_cache_v3.NewLinearCache(envoy_resource_v3.EndpointType,
48+
envoy_cache_v3.WithVersionPrefix(uuid.NewString()+"-"),
49+
envoy_cache_v3.WithLogger(log.WithField("context", "edsCache")))
4850

4951
mux = &envoy_cache_v3.MuxCache{
5052
Caches: map[string]envoy_cache_v3.Cache{},
@@ -89,21 +91,40 @@ func (s *SnapshotHandler) GetCache() envoy_cache_v3.Cache {
8991
// Refresh is called when the EndpointsTranslator updates values
9092
// in its cache. It updates the EDS cache.
9193
func (s *SnapshotHandler) Refresh() {
92-
version := uuid.NewString()
94+
previouslyNotifiedResources := s.edsCache.GetResources()
95+
currentResources := envoy_cache_v3.IndexRawResourcesByName(asResources(s.resources[envoy_resource_v3.EndpointType].Contents()))
96+
97+
toUpdate := make(map[string]envoy_types.Resource, len(currentResources))
98+
toDelete := make([]string, 0, len(previouslyNotifiedResources))
99+
100+
for resourceName, previousResource := range previouslyNotifiedResources {
101+
if newResource, ok := currentResources[resourceName]; ok {
102+
// Add resources that were updated.
103+
if !proto.Equal(newResource, previousResource) {
104+
toUpdate[resourceName] = newResource
105+
}
106+
} else {
107+
// Add resources that were deleted.
108+
toDelete = append(toDelete, resourceName)
109+
}
110+
}
93111

94-
resources := map[envoy_resource_v3.Type][]envoy_types.Resource{
95-
envoy_resource_v3.EndpointType: asResources(s.resources[envoy_resource_v3.EndpointType].Contents()),
112+
// Add resources that are new.
113+
for resourceName, newResource := range currentResources {
114+
if _, ok := previouslyNotifiedResources[resourceName]; !ok {
115+
toUpdate[resourceName] = newResource
116+
}
96117
}
97118

98-
snapshot, err := envoy_cache_v3.NewSnapshot(version, resources)
99-
if err != nil {
100-
s.log.Errorf("failed to generate snapshot version %q: %s", version, err)
119+
if len(toUpdate) == 0 && len(toDelete) == 0 {
120+
s.log.Debug("no EDS resources to update")
101121
return
102122
}
103123

104-
if err := s.edsCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
105-
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
106-
return
124+
s.log.WithField("toUpdate", len(toUpdate)).WithField("toDelete", len(toDelete)).Debug("refreshing EDS cache")
125+
126+
if err := s.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
127+
s.log.WithError(err).Error("failed to update EDS cache")
107128
}
108129
}
109130

0 commit comments

Comments
 (0)