Skip to content

Commit 5251f36

Browse files
Refactored Gossip into MembershipState (#4968)
* refactor Gossip class into `MembershipState` port of akka/akka#23291 * completed `MembershipState` port * fixed some downed observers calls * forgot to copy gossip upon `Welcome` from Leader * forgot to copy `MembershipState` while calling `UpdateLatestGossip` * refactored all DOWN-ing logic to live inside `Gossip` class * added some additional methods onto `MembershipState` * fixed ValidNodeForGossip bug * fixed equality check for Reachability should be quality by reference, not by value Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 953fc0f commit 5251f36

File tree

10 files changed

+543
-428
lines changed

10 files changed

+543
-428
lines changed

src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,25 @@ public class ClusterDomainEventPublisherSpec : AkkaSpec
3535
static readonly Member dUp = TestMember.Create(new Address("akka.tcp", "sys", "d", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP"));
3636

3737
static readonly Gossip g0 = new Gossip(ImmutableSortedSet.Create(aUp)).Seen(aUp.UniqueAddress);
38+
static readonly MembershipState state0 = new MembershipState(g0, aUp.UniqueAddress);
3839
static readonly Gossip g1 = new Gossip(ImmutableSortedSet.Create(aUp, cJoining)).Seen(aUp.UniqueAddress).Seen(cJoining.UniqueAddress);
40+
static readonly MembershipState state1 = new MembershipState(g1, aUp.UniqueAddress);
3941
static readonly Gossip g2 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp)).Seen(aUp.UniqueAddress);
42+
static readonly MembershipState state2 = new MembershipState(g2, aUp.UniqueAddress);
4043
static readonly Gossip g3 = g2.Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress);
44+
static readonly MembershipState state3 = new MembershipState(g3, aUp.UniqueAddress);
4145
static readonly Gossip g4 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress);
46+
static readonly MembershipState state4 = new MembershipState(g4, aUp.UniqueAddress);
4247
static readonly Gossip g5 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress).Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress);
48+
static readonly MembershipState state5 = new MembershipState(g5, aUp.UniqueAddress);
4349
static readonly Gossip g6 = new Gossip(ImmutableSortedSet.Create(aLeaving, bExiting, cUp)).Seen(aUp.UniqueAddress);
50+
static readonly MembershipState state6 = new MembershipState(g6, aUp.UniqueAddress);
4451
static readonly Gossip g7 = new Gossip(ImmutableSortedSet.Create(aExiting, bExiting, cUp)).Seen(aUp.UniqueAddress);
52+
static readonly MembershipState state7 = new MembershipState(g7, aUp.UniqueAddress);
4553
static readonly Gossip g8 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, dUp.UniqueAddress))).Seen(aUp.UniqueAddress);
54+
static readonly MembershipState state8 = new MembershipState(g8, aUp.UniqueAddress);
4655

56+
static readonly MembershipState _emptyMembershipState = new MembershipState(Gossip.Empty, aUp.UniqueAddress);
4757
readonly TestProbe _memberSubscriber;
4858

4959
public ClusterDomainEventPublisherSpec() : base(Config)
@@ -54,31 +64,31 @@ public ClusterDomainEventPublisherSpec() : base(Config)
5464
Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.ClusterShuttingDown));
5565

5666
_publisher = Sys.ActorOf(Props.Create<ClusterDomainEventPublisher>());
57-
_publisher.Tell(new InternalClusterAction.PublishChanges(g0));
67+
_publisher.Tell(new InternalClusterAction.PublishChanges(state0));
5868
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(aUp));
5969
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(aUp.Address));
6070
}
6171

6272
[Fact]
6373
public void ClusterDomainEventPublisher_must_publish_MemberJoined()
6474
{
65-
_publisher.Tell(new InternalClusterAction.PublishChanges(g1));
75+
_publisher.Tell(new InternalClusterAction.PublishChanges(state1));
6676
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining));
6777
}
6878

6979
[Fact]
7080
public void ClusterDomainEventPublisher_must_publish_MemberUp()
7181
{
72-
_publisher.Tell(new InternalClusterAction.PublishChanges(g2));
73-
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
82+
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
83+
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
7484
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
7585
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
7686
}
7787

7888
[Fact]
7989
public void ClusterDomainEventPublisher_must_publish_leader_changed()
8090
{
81-
_publisher.Tell(new InternalClusterAction.PublishChanges(g4));
91+
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
8292
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
8393
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
8494
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
@@ -89,17 +99,17 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed()
8999
[Fact]
90100
public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed()
91101
{
92-
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
102+
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
93103
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
94104
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
95-
_publisher.Tell(new InternalClusterAction.PublishChanges(g6));
105+
_publisher.Tell(new InternalClusterAction.PublishChanges(state6));
96106
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving));
97-
_publisher.Tell(new InternalClusterAction.PublishChanges(g7));
107+
_publisher.Tell(new InternalClusterAction.PublishChanges(state7));
98108
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting));
99109
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address));
100110
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
101111
// at the removed member a an empty gossip is the last thing
102-
_publisher.Tell(new InternalClusterAction.PublishChanges(Gossip.Empty));
112+
_publisher.Tell(new InternalClusterAction.PublishChanges(_emptyMembershipState));
103113
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting));
104114
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting));
105115
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up));
@@ -109,13 +119,13 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_lea
109119
[Fact]
110120
public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader()
111121
{
112-
_publisher.Tell(new InternalClusterAction.PublishChanges(g4));
122+
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
113123
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
114124
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
115125
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
116126
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address));
117127

118-
_publisher.Tell(new InternalClusterAction.PublishChanges(g5));
128+
_publisher.Tell(new InternalClusterAction.PublishChanges(state5));
119129
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
120130
}
121131

@@ -125,9 +135,9 @@ public void ClusterDomainEventPublisher_must_publish_role_leader_changed()
125135
var subscriber = CreateTestProbe();
126136
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged))));
127137
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
128-
_publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cJoining, dUp))));
138+
_publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress)));
129139
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address));
130-
_publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cUp, dUp))));
140+
_publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress)));
131141
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address));
132142
}
133143

@@ -145,7 +155,7 @@ public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subsc
145155
public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe()
146156
{
147157
var subscriber = CreateTestProbe();
148-
_publisher.Tell(new InternalClusterAction.PublishChanges(g8));
158+
_publisher.Tell(new InternalClusterAction.PublishChanges(state8));
149159
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent))));
150160

151161
subscriber.ReceiveN(4).Should().BeEquivalentTo(
@@ -165,7 +175,7 @@ public void ClusterDomainEventPublisher_should_support_unsubscribe()
165175
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent))));
166176
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
167177
_publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent)));
168-
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
178+
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
169179
subscriber.ExpectNoMsg(500.Milliseconds());
170180
// but memberSubscriber is still subscriber
171181
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
@@ -178,10 +188,10 @@ public void ClusterDomainEventPublisher_must_publish_seen_changed()
178188
var subscriber = CreateTestProbe();
179189
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged))));
180190
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
181-
_publisher.Tell(new InternalClusterAction.PublishChanges(g2));
191+
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
182192
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
183193
subscriber.ExpectNoMsg(500.Milliseconds());
184-
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
194+
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
185195
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
186196
subscriber.ExpectNoMsg(500.Milliseconds());
187197
}

0 commit comments

Comments
 (0)