Skip to content

Commit b50e46f

Browse files
authored
Fix connection bugs from BalancerAddress changes (#2265)
1 parent 311f878 commit b50e46f

File tree

4 files changed

+71
-8
lines changed

4 files changed

+71
-8
lines changed

src/Grpc.Net.Client/Balancer/Subchannel.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#endregion
1818

1919
#if SUPPORT_LOAD_BALANCING
20+
using System.Diagnostics;
2021
using System.Net;
2122
using Grpc.Core;
2223
using Grpc.Net.Client.Balancer.Internal;
@@ -173,6 +174,10 @@ public void UpdateAddresses(IReadOnlyList<BalancerAddress> addresses)
173174
return;
174175
}
175176

177+
// Get a copy of the current address before updating addresses.
178+
// Updating addresses to not contain this value changes the property to return null.
179+
var currentAddress = CurrentAddress;
180+
176181
_addresses.Clear();
177182
_addresses.AddRange(addresses);
178183

@@ -186,11 +191,11 @@ public void UpdateAddresses(IReadOnlyList<BalancerAddress> addresses)
186191
requireReconnect = true;
187192
break;
188193
case ConnectivityState.Ready:
189-
// Transport uses the subchannel lock but take copy in an abundance of caution.
190-
var currentAddress = CurrentAddress;
194+
// Check if the subchannel is connected to an address that's not longer present.
195+
// In this situation require the subchannel to reconnect to a new address.
191196
if (currentAddress != null)
192197
{
193-
if (GetAddressByEndpoint(_addresses, currentAddress.EndPoint) != null)
198+
if (GetAddressByEndpoint(_addresses, currentAddress.EndPoint) is null)
194199
{
195200
SubchannelLog.ConnectedAddressNotInUpdatedAddresses(_logger, Id, currentAddress);
196201
requireReconnect = true;

src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#endregion
1818

1919
#if SUPPORT_LOAD_BALANCING
20+
using System.Diagnostics;
2021
using Grpc.Core;
2122
using Grpc.Net.Client.Balancer.Internal;
2223
using Microsoft.Extensions.Logging;
@@ -144,8 +145,11 @@ public override void UpdateChannelState(ChannelState state)
144145
// The new subchannel address has the same endpoint so the connection isn't impacted.
145146
if (!BalancerAddressEqualityComparer.Instance.Equals(address, newOrCurrentSubchannel.Address))
146147
{
148+
newOrCurrentSubchannel = new AddressSubchannel(
149+
newOrCurrentSubchannel.Subchannel,
150+
address,
151+
newOrCurrentSubchannel.LastKnownState);
147152
newOrCurrentSubchannel.Subchannel.UpdateAddresses(new[] { address });
148-
newOrCurrentSubchannel = new AddressSubchannel(newOrCurrentSubchannel.Subchannel, address);
149153
}
150154

151155
SubchannelLog.SubchannelPreserved(_logger, newOrCurrentSubchannel.Subchannel.Id, address);
@@ -306,15 +310,16 @@ protected override void Dispose(bool disposing)
306310
/// <returns>A subchannel picker.</returns>
307311
protected abstract SubchannelPicker CreatePicker(IReadOnlyList<Subchannel> readySubchannels);
308312

313+
[DebuggerDisplay("Subchannel = {Subchannel.Id}, Address = {Address}, LastKnownState = {LastKnownState}")]
309314
private sealed class AddressSubchannel
310315
{
311316
private ConnectivityState _lastKnownState;
312317

313-
public AddressSubchannel(Subchannel subchannel, BalancerAddress address)
318+
public AddressSubchannel(Subchannel subchannel, BalancerAddress address, ConnectivityState lastKnownState = ConnectivityState.Idle)
314319
{
315320
Subchannel = subchannel;
316321
Address = address;
317-
_lastKnownState = ConnectivityState.Idle;
322+
_lastKnownState = lastKnownState;
318323
}
319324

320325
// Track connectivity state that has been updated to load balancer.

test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,18 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown()
5656
});
5757

5858
services.AddSingleton<ResolverFactory>(new TestResolverFactory(resolver));
59-
services.AddSingleton<ISubchannelTransportFactory>(new TestSubchannelTransportFactory());
59+
60+
var subChannelConnections = new List<Subchannel>();
61+
var transportFactory = new TestSubchannelTransportFactory((s, c) =>
62+
{
63+
lock (subChannelConnections)
64+
{
65+
subChannelConnections.Add(s);
66+
}
67+
return Task.FromResult(new TryConnectResult(ConnectivityState.Ready));
68+
});
69+
services.AddSingleton<ISubchannelTransportFactory>(transportFactory);
70+
6071
var serviceProvider = services.BuildServiceProvider();
6172
var logger = serviceProvider.GetRequiredService<ILoggerProvider>().CreateLogger(GetType().FullName!);
6273

@@ -95,6 +106,12 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown()
95106
Assert.AreEqual(1, subchannels[0]._addresses.Count);
96107
Assert.AreEqual(new DnsEndPoint("localhost", 81), subchannels[0]._addresses[0].EndPoint);
97108
Assert.AreEqual(ConnectivityState.Ready, subchannels[0].State);
109+
110+
lock (subChannelConnections)
111+
{
112+
Assert.AreEqual(2, subChannelConnections.Count);
113+
Assert.AreSame(subChannelConnections[0], subChannelConnections[1]);
114+
}
98115
}
99116

100117
[Test]

test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,15 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()
300300

301301
var connectState = ConnectivityState.Ready;
302302

303-
var transportFactory = new TestSubchannelTransportFactory((s, c) => Task.FromResult(new TryConnectResult(connectState)));
303+
var subChannelConnections = new List<Subchannel>();
304+
var transportFactory = new TestSubchannelTransportFactory((s, c) =>
305+
{
306+
lock (subChannelConnections)
307+
{
308+
subChannelConnections.Add(s);
309+
}
310+
return Task.FromResult(new TryConnectResult(connectState));
311+
});
304312
services.AddSingleton<TestResolver>(s =>
305313
{
306314
return new TestResolver(
@@ -351,9 +359,15 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()
351359
Assert.AreEqual(new DnsEndPoint("localhost", 82), subchannels[2]._addresses[0].EndPoint);
352360

353361
// Preserved because port 81, 82 is in both refresh results
362+
var discardedSubchannel = subchannels[0];
354363
var preservedSubchannel1 = subchannels[1];
355364
var preservedSubchannel2 = subchannels[2];
356365

366+
await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(
367+
serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(GetType()),
368+
channel.ConnectionManager,
369+
expectedCount: 3).DefaultTimeout();
370+
357371
var address2 = new BalancerAddress("localhost", 82);
358372
address2.Attributes.Set(new BalancerAttributesKey<int>("test"), 1);
359373

@@ -364,7 +378,13 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()
364378
new BalancerAddress("localhost", 83)
365379
});
366380

381+
await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(
382+
serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(GetType()),
383+
channel.ConnectionManager,
384+
expectedCount: 3).DefaultTimeout();
385+
367386
subchannels = channel.ConnectionManager.GetSubchannels();
387+
var newSubchannel = subchannels[2];
368388
Assert.AreEqual(3, subchannels.Count);
369389

370390
Assert.AreEqual(1, subchannels[0]._addresses.Count);
@@ -379,6 +399,22 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()
379399

380400
// Test that the channel's address was updated with new attribute with new attributes.
381401
Assert.AreSame(preservedSubchannel2.CurrentAddress, address2);
402+
403+
lock (subChannelConnections)
404+
{
405+
try
406+
{
407+
Assert.AreEqual(4, subChannelConnections.Count);
408+
Assert.Contains(discardedSubchannel, subChannelConnections);
409+
Assert.Contains(preservedSubchannel1, subChannelConnections);
410+
Assert.Contains(preservedSubchannel2, subChannelConnections);
411+
Assert.Contains(newSubchannel, subChannelConnections);
412+
}
413+
catch (Exception ex)
414+
{
415+
throw new Exception("Connected subchannels: " + Environment.NewLine + string.Join(Environment.NewLine, subChannelConnections), ex);
416+
}
417+
}
382418
}
383419
}
384420
#endif

0 commit comments

Comments
 (0)