Skip to content

Commit a0da8cd

Browse files
Reachability performance optimziation (#4955)
* reduced iteration count to speed up benchmarks * optimize some System.Collections.Immutable invocations to allocate less * cleanup dictionary construction * fixed multiple enumeration bug in `Reachability`
1 parent 3f232d8 commit a0da8cd

File tree

3 files changed

+24
-27
lines changed

3 files changed

+24
-27
lines changed

src/benchmark/Akka.Benchmarks/Cluster/ReachabilityBenchmarks.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Collections.Immutable;
34
using System.Linq;
45
using Akka.Util;
56
using Akka.Actor;
@@ -13,7 +14,7 @@ namespace Akka.Benchmarks.Cluster
1314
[Config(typeof(MicroBenchmarkConfig))]
1415
public class ReachabilityBenchmarks
1516
{
16-
[Params(10, 100, 250)]
17+
[Params(100)]
1718
public int NodesSize;
1819

1920
[Params(100)]
@@ -51,15 +52,15 @@ private Reachability AddUnreachable(Reachability baseReachability, int count)
5152
internal Reachability Reachability1;
5253
internal Reachability Reachability2;
5354
internal Reachability Reachability3;
54-
internal HashSet<UniqueAddress> Allowed;
55+
internal ImmutableHashSet<UniqueAddress> Allowed;
5556

5657
[GlobalSetup]
5758
public void Setup()
5859
{
5960
Reachability1 = CreateReachabilityOfSize(Reachability.Empty, NodesSize);
6061
Reachability2 = CreateReachabilityOfSize(Reachability1, NodesSize);
6162
Reachability3 = AddUnreachable(Reachability1, NodesSize / 2);
62-
Allowed = Reachability1.Versions.Keys.ToHashSet();
63+
Allowed = Reachability1.Versions.Keys.ToImmutableHashSet();
6364
}
6465

6566
private void CheckThunkFor(Reachability r1, Reachability r2, Action<Reachability, Reachability> thunk,

src/core/Akka.Cluster/Gossip.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public Gossip Merge(Gossip that)
282282
var mergedMembers = EmptyMembers.Union(Member.PickHighestPriority(this._members, that._members));
283283

284284
// 3. merge reachability table by picking records with highest version
285-
var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress),
285+
var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress).ToImmutableSortedSet(),
286286
that._overview.Reachability);
287287

288288
// 4. Nobody can have seen this new gossip yet

src/core/Akka.Cluster/Reachability.cs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public enum ReachabilityStatus
5555
public static readonly Reachability Empty =
5656
new Reachability(ImmutableList.Create<Record>(), ImmutableDictionary.Create<UniqueAddress, long>());
5757

58-
//TODO: Serialization should ignore
5958
private readonly Lazy<Cache> _cache;
6059

6160
/// <summary>
@@ -80,11 +79,6 @@ public Reachability(ImmutableList<Record> records, ImmutableDictionary<UniqueAdd
8079
/// </summary>
8180
public ImmutableDictionary<UniqueAddress, long> Versions { get; }
8281

83-
/*
84-
* def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean =
85-
status(observer, subject) == Reachable
86-
*/
87-
8882
/// <summary>
8983
/// TBD
9084
/// </summary>
@@ -178,7 +172,11 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach
178172
var newVersions = Versions.SetItem(observer, v);
179173
var newRecord = new Record(observer, subject, status, v);
180174
var oldObserverRows = ObserverRows(observer);
175+
176+
// don't record Reachable observation if nothing has been noted so far
181177
if (oldObserverRows == null && status == ReachabilityStatus.Reachable) return this;
178+
179+
// otherwise, create new instance including this first observation
182180
if (oldObserverRows == null) return new Reachability(Records.Add(newRecord), newVersions);
183181

184182
if (!oldObserverRows.TryGetValue(subject, out var oldRecord))
@@ -206,7 +204,7 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach
206204
/// <param name="allowed">TBD</param>
207205
/// <param name="other">TBD</param>
208206
/// <returns>TBD</returns>
209-
public Reachability Merge(IEnumerable<UniqueAddress> allowed, Reachability other)
207+
public Reachability Merge(IImmutableSet<UniqueAddress> allowed, Reachability other)
210208
{
211209
var recordBuilder = ImmutableList.CreateBuilder<Record>();
212210
//TODO: Size hint somehow?
@@ -337,7 +335,7 @@ public bool IsReachable(UniqueAddress observer, UniqueAddress subject)
337335
public ImmutableHashSet<UniqueAddress> AllUnreachableFrom(UniqueAddress observer)
338336
{
339337
var observerRows = ObserverRows(observer);
340-
if (observerRows == null) return ImmutableHashSet.Create<UniqueAddress>();
338+
if (observerRows == null) return ImmutableHashSet<UniqueAddress>.Empty;
341339
return
342340
ImmutableHashSet.CreateRange(
343341
observerRows.Where(p => p.Value.Status == ReachabilityStatus.Unreachable).Select(p => p.Key));
@@ -351,16 +349,18 @@ public ImmutableHashSet<UniqueAddress> AllUnreachableFrom(UniqueAddress observer
351349
public ImmutableList<Record> RecordsFrom(UniqueAddress observer)
352350
{
353351
var rows = ObserverRows(observer);
354-
if (rows == null) return ImmutableList.Create<Record>();
352+
if (rows == null) return ImmutableList<Record>.Empty;
355353
return rows.Values.ToImmutableList();
356354
}
357355

356+
/// only used for testing
358357
/// <inheritdoc />
359358
public override int GetHashCode()
360359
{
361360
return Versions.GetHashCode();
362361
}
363362

363+
/// only used for testing
364364
/// <inheritdoc />
365365
public override bool Equals(object obj)
366366
{
@@ -467,10 +467,10 @@ public Cache(ImmutableList<Record> records)
467467
{
468468
if (records.IsEmpty)
469469
{
470-
ObserverRowMap = ImmutableDictionary
471-
.Create<UniqueAddress, ImmutableDictionary<UniqueAddress, Record>>();
472-
AllTerminated = ImmutableHashSet.Create<UniqueAddress>();
473-
AllUnreachable = ImmutableHashSet.Create<UniqueAddress>();
470+
ObserverRowMap = ImmutableDictionary<UniqueAddress, ImmutableDictionary<UniqueAddress, Record>>
471+
.Empty;
472+
AllTerminated = ImmutableHashSet<UniqueAddress>.Empty;
473+
AllUnreachable = ImmutableHashSet<UniqueAddress>.Empty;
474474
}
475475
else
476476
{
@@ -480,16 +480,12 @@ public Cache(ImmutableList<Record> records)
480480

481481
foreach (var r in records)
482482
{
483-
ImmutableDictionary<UniqueAddress, Record> m = mapBuilder.TryGetValue(r.Observer, out m)
484-
? m.SetItem(r.Subject, r)
485-
//TODO: Other collections take items for Create. Create unnecessary array here
486-
: ImmutableDictionary.CreateRange(new[]
487-
{
488-
new KeyValuePair<UniqueAddress, Record>(r.Subject, r)
489-
});
483+
ImmutableDictionary<UniqueAddress, Record> m = mapBuilder.TryGetValue(r.Observer, out var mR)
484+
? mR.SetItem(r.Subject, r)
485+
: ImmutableDictionary<UniqueAddress, Record>.Empty.Add(r.Subject, r);
490486

491487

492-
mapBuilder.AddOrSet(r.Observer, m);
488+
mapBuilder[r.Observer] = m;
493489

494490
if (r.Status == ReachabilityStatus.Unreachable) unreachableBuilder.Add(r.Subject);
495491
else if (r.Status == ReachabilityStatus.Terminated) terminatedBuilder.Add(r.Subject);
@@ -514,12 +510,12 @@ public ImmutableDictionary<UniqueAddress, ImmutableDictionary<UniqueAddress, Rec
514510
}
515511

516512
/// <summary>
517-
/// TBD
513+
/// Contains all nodes that have been observed as Terminated by at least one other node.
518514
/// </summary>
519515
public ImmutableHashSet<UniqueAddress> AllTerminated { get; }
520516

521517
/// <summary>
522-
/// TBD
518+
/// Contains all nodes that have been observed as Unreachable by at least one other node.
523519
/// </summary>
524520
public ImmutableHashSet<UniqueAddress> AllUnreachable { get; }
525521

0 commit comments

Comments
 (0)