Skip to content

Commit 1b2d9ce

Browse files
committed
Port to 1.38.1
1 parent 253b088 commit 1b2d9ce

File tree

141 files changed

+2550
-235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

141 files changed

+2550
-235
lines changed

driver/Aeron.Driver.nuspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<package >
33
<metadata>
44
<id>Aeron.Driver</id>
5-
<version>1.37.0</version>
5+
<version>1.38.1</version>
66
<title>Aeron Driver</title>
77
<authors>Adaptive Financial Consulting Ltd.</authors>
88
<owners>Adaptive Financial Consulting Ltd.</owners>

driver/media-driver.jar

61.3 KB
Binary file not shown.

src/Adaptive.Aeron.Tests/PublicationTest.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void SetUp()
8686

8787
LogBufferDescriptor.InitialiseTailWithTermId(_logMetaDataBuffer, PartionIndex, TermID1);
8888

89-
A.CallTo(() => _conductor.ReleasePublication(_publication)).Invokes(() => _publication.InternalClose());
89+
A.CallTo(() => _conductor.RemovePublication(_publication)).Invokes(() => _publication.InternalClose());
9090
}
9191

9292
[Test]
@@ -95,7 +95,7 @@ public void ShouldEnsureThePublicationIsOpenBeforeReadingPosition()
9595
_publication.Dispose();
9696
Assert.AreEqual(Publication.CLOSED, _publication.Position);
9797

98-
A.CallTo(() => _conductor.ReleasePublication(_publication)).MustHaveHappened();
98+
A.CallTo(() => _conductor.RemovePublication(_publication)).MustHaveHappened();
9999
}
100100

101101
[Test]
@@ -147,7 +147,21 @@ public void ShouldReleasePublicationOnClose()
147147
{
148148
_publication.Dispose();
149149

150-
A.CallTo(() => _conductor.ReleasePublication(_publication)).MustHaveHappened();
150+
A.CallTo(() => _conductor.RemovePublication(_publication)).MustHaveHappened();
151+
}
152+
153+
[Test]
154+
public void ShouldReturnErrorMessages()
155+
{
156+
Assert.Equals("NOT_CONNECTED", Publication.ErrorString(-1L));
157+
Assert.Equals("BACK_PRESSURED", Publication.ErrorString(-2L));
158+
Assert.Equals("ADMIN_ACTION", Publication.ErrorString(-3L));
159+
Assert.Equals("CLOSED", Publication.ErrorString(-4L));
160+
Assert.Equals("MAX_POSITION_EXCEEDED", Publication.ErrorString(-5L));
161+
Assert.Equals("NONE", Publication.ErrorString(0L));
162+
Assert.Equals("NONE", Publication.ErrorString(1L));
163+
Assert.Equals("UNKNOWN", Publication.ErrorString(-6L));
164+
Assert.Equals("UNKNOWN", Publication.ErrorString(long.MinValue));
151165
}
152166
}
153167
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using Adaptive.Aeron.Security;
3+
using Adaptive.Agrona;
4+
using FakeItEasy;
5+
using NUnit.Framework;
6+
7+
namespace Adaptive.Aeron.Tests.Security
8+
{
9+
public class authorisation_service_test
10+
{
11+
[Test]
12+
public void ShouldAllowAnyCommandIfAllowAllIsUsed()
13+
{
14+
byte[] encodedCredentials = {0x1, 0x2, 0x3};
15+
var errorHandler = A.Fake<ErrorHandler>();
16+
const int protocolId = 77;
17+
int actionId = new Random().Next();
18+
19+
Assert.True(AllowAllAuthorisationService.INSTANCE.IsAuthorised(protocolId, actionId, null, encodedCredentials));
20+
A.CallTo(errorHandler).MustNotHaveHappened();
21+
}
22+
23+
[Test]
24+
public void ShouldForbidAllCommandsIfDenyAllIsUsed()
25+
{
26+
byte[] encodedCredentials = {0x4, 0x5, 0x6};
27+
var errorHandler = A.Fake<ErrorHandler>();
28+
const int protocolId = 77;
29+
int actionId = new Random().Next();
30+
31+
Assert.False(DenyAllAuthorisationService.INSTANCE.IsAuthorised(protocolId, actionId, null, encodedCredentials));
32+
A.CallTo(errorHandler).MustNotHaveHappened();
33+
}
34+
}
35+
36+
}

src/Adaptive.Aeron.Tests/SubscriptionTest.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void Setup()
7878
AvailableImageHandler,
7979
UnavailableImageHandler);
8080

81-
A.CallTo(() => Conductor.ReleaseSubscription(Subscription)).Invokes(() => Subscription.InternalClose(Aeron.NULL_VALUE));
81+
A.CallTo(() => Conductor.RemoveSubscription(Subscription)).Invokes(() => Subscription.InternalClose(Aeron.NULL_VALUE));
8282
}
8383

8484
[Test]
@@ -87,7 +87,7 @@ public void ShouldEnsureTheSubscriptionIsOpenWhenPolling()
8787
Subscription.Dispose();
8888
Assert.True(Subscription.IsClosed);
8989

90-
A.CallTo(() => Conductor.ReleaseSubscription(Subscription)).MustHaveHappened();
90+
A.CallTo(() => Conductor.RemoveSubscription(Subscription)).MustHaveHappened();
9191
}
9292

9393
[Test]

src/Adaptive.Aeron/Adaptive.Aeron.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<PropertyGroup>
33
<TargetFramework>netstandard2.0</TargetFramework>
44
<PackageId>Aeron.Client</PackageId>
5-
<VersionPrefix>1.37.0</VersionPrefix>
5+
<VersionPrefix>1.38.1</VersionPrefix>
66
<Authors>Adaptive Financial Consulting Ltd.</Authors>
77
<Company>Adaptive Financial Consulting Ltd.</Company>
88
<Product>Aeron Client</Product>

src/Adaptive.Aeron/Aeron.cs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,17 @@ public long AsyncAddPublication(string channel, int streamId)
257257
{
258258
return _conductor.AsyncAddPublication(channel, streamId);
259259
}
260+
261+
/// <summary>
262+
/// Asynchronously remove a <seealso cref="Publication"/>.
263+
/// </summary>
264+
/// <param name="registrationId"> to be of the publication removed. </param>
265+
/// <seealso cref="AsyncAddPublication(String, int)"/>
266+
/// <seealso cref="AsyncAddExclusivePublication(String, int)"/>
267+
public void AsyncRemovePublication(long registrationId)
268+
{
269+
_conductor.RemovePublication(registrationId);
270+
}
260271

261272
/// <summary>
262273
/// Asynchronously add a <seealso cref="Publication"/> for publishing messages to subscribers from a single thread.
@@ -948,6 +959,30 @@ static Context()
948959
/// Placeholder value to use in URIs to specify that a timestamp should be stored in the reserved value field.
949960
/// </summary>
950961
public const string RESERVED_OFFSET = "reserved";
962+
963+
/// <summary>
964+
/// Property name for a fallback PrintStream based logger when it is not possible to use the error logging
965+
/// callback. Supported values are stdout, stderr, no_op (stderr is the default).
966+
/// </summary>
967+
public const string FALLBACK_LOGGER_PROP_NAME = "aeron.fallback.logger";
968+
969+
/// <summary>
970+
/// Get the current fallback logger based on the supplied property.
971+
/// </summary>
972+
/// <returns> the configured PrintStream. </returns>
973+
public static TextWriter FallbackLogger()
974+
{
975+
string fallbackLoggerName = Config.GetProperty(FALLBACK_LOGGER_PROP_NAME, "stderr");
976+
switch (fallbackLoggerName)
977+
{
978+
case "stdout":
979+
return Console.Out;
980+
981+
case "stderr":
982+
default:
983+
return Console.Error;
984+
}
985+
}
951986

952987
/// <summary>
953988
/// Get the default directory name to be used if <seealso cref="AeronDirectoryName(String)"/> is not set. This will take
@@ -1999,7 +2034,7 @@ public MappedByteBuffer MapExistingCncFile(Action<string> logProgress)
19992034
{
20002035
FileInfo cncFile = new FileInfo(Path.Combine(_aeronDirectory.FullName, CncFileDescriptor.CNC_FILE));
20012036

2002-
if (cncFile.Exists && cncFile.Length > 0)
2037+
if (cncFile.Exists && cncFile.Length > CncFileDescriptor.END_OF_METADATA_OFFSET)
20032038
{
20042039
if (null != logProgress)
20052040
{
@@ -2023,7 +2058,7 @@ public static bool IsDriverActive(DirectoryInfo directory, long driverTimeoutMs,
20232058
{
20242059
FileInfo cncFile = new FileInfo(Path.Combine(directory.FullName, CncFileDescriptor.CNC_FILE));
20252060

2026-
if (cncFile.Exists && cncFile.Length > 0)
2061+
if (cncFile.Exists && cncFile.Length > CncFileDescriptor.END_OF_METADATA_OFFSET)
20272062
{
20282063
logger("INFO: Aeron CnC file " + cncFile + " exists");
20292064

@@ -2120,23 +2155,26 @@ public static bool RequestDriverTermination(
21202155
{
21212156
FileInfo cncFile = new FileInfo(Path.Combine(directory.FullName, CncFileDescriptor.CNC_FILE));
21222157

2123-
if (cncFile.Exists && cncFile.Length > 0)
2158+
if (cncFile.Exists && cncFile.Length > CncFileDescriptor.END_OF_METADATA_OFFSET)
21242159
{
21252160
var cncByteBuffer = IoUtil.MapExistingFile(cncFile, "CnC file");
21262161
try
21272162
{
21282163
UnsafeBuffer cncMetaDataBuffer = CncFileDescriptor.CreateMetaDataBuffer(cncByteBuffer);
21292164
int cncVersion = cncMetaDataBuffer.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0));
21302165

2131-
CncFileDescriptor.CheckVersion(cncVersion);
2166+
if (cncVersion > 0)
2167+
{
2168+
CncFileDescriptor.CheckVersion(cncVersion);
21322169

2133-
ManyToOneRingBuffer toDriverBuffer =
2134-
new ManyToOneRingBuffer(
2135-
CncFileDescriptor.CreateToDriverBuffer(cncByteBuffer, cncMetaDataBuffer));
2136-
long clientId = toDriverBuffer.NextCorrelationId();
2137-
DriverProxy driverProxy = new DriverProxy(toDriverBuffer, clientId);
2170+
ManyToOneRingBuffer toDriverBuffer =
2171+
new ManyToOneRingBuffer(
2172+
CncFileDescriptor.CreateToDriverBuffer(cncByteBuffer, cncMetaDataBuffer));
2173+
long clientId = toDriverBuffer.NextCorrelationId();
2174+
DriverProxy driverProxy = new DriverProxy(toDriverBuffer, clientId);
21382175

2139-
return driverProxy.TerminateDriver(tokenBuffer, tokenOffset, tokenLength);
2176+
return driverProxy.TerminateDriver(tokenBuffer, tokenOffset, tokenLength);
2177+
}
21402178
}
21412179
finally
21422180
{
@@ -2191,7 +2229,7 @@ public int SaveErrorLog(StreamWriter writer, MappedByteBuffer cncByteBuffer)
21912229
public static int PrintErrorLog(IAtomicBuffer errorBuffer, TextWriter @out)
21922230
{
21932231
int distinctErrorCount = 0;
2194-
if (ErrorLogReader.HasErrors(errorBuffer))
2232+
if (errorBuffer.Capacity > 0 && ErrorLogReader.HasErrors(errorBuffer))
21952233
{
21962234
void ErrorConsumer(int count, long firstTimestamp, long lastTimestamp, string ex)
21972235
=> @out.WriteLine(

src/Adaptive.Aeron/ChannelUri.cs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ public ChannelUri Media(string media)
113113
/// Is the channel <seealso cref="Media()"/> equal to <seealso cref="Aeron.Context.UDP_MEDIA"/>.
114114
/// </summary>
115115
/// <returns> true the channel <seealso cref="Media()"/> equals <seealso cref="Aeron.Context.UDP_MEDIA"/>. </returns>
116-
public bool Udp => Aeron.Context.UDP_MEDIA.Equals(_media);
116+
public bool IsUdp => Aeron.Context.UDP_MEDIA.Equals(_media);
117117

118118
/// <summary>
119119
/// Is the channel <seealso cref="Media()"/> equal to <seealso cref="Aeron.Context.IPC_MEDIA"/>.
120120
/// </summary>
121121
/// <returns> true the channel <seealso cref="Media()"/> equals <seealso cref="Aeron.Context.IPC_MEDIA"/>. </returns>
122-
public bool Ipc => Aeron.Context.IPC_MEDIA.Equals(_media);
122+
public bool IsIpc => Aeron.Context.IPC_MEDIA.Equals(_media);
123123

124124
/// <summary>
125125
/// The scheme for the URI. Must be "aeron".
@@ -473,6 +473,30 @@ public static long GetTag(string paramValue)
473473
{
474474
return IsTagged(paramValue) ? long.Parse(paramValue.Substring(4, paramValue.Length - 4)) : INVALID_TAG;
475475
}
476+
477+
/// <summary>
478+
/// Create a channel URI for a destination, i.e. a channel that uses {@code media} and {@code interface} parameters
479+
/// of the original channel and adds specified {@code endpoint} to it. For example given the input channel is
480+
/// {@code aeron:udp?mtu=1440|ttl=0|endpoint=localhost:8090|term-length=128k|interface=eth0} and the endpoint is
481+
/// {@code 192.168.0.14} the output of this method will be {@code aeron:udp?endpoint=192.168.0.14|interface=eth0}.
482+
/// </summary>
483+
/// <param name="channel"> for which the destination is being added. </param>
484+
/// <param name="endpoint"> for the target destination. </param>
485+
/// <returns> new channel URI for a destination. </returns>
486+
public static string CreateDestinationUri(string channel, string endpoint)
487+
{
488+
ChannelUri channelUri = ChannelUri.Parse(channel);
489+
string uri = AERON_PREFIX + channelUri.Media() + "?" + Aeron.Context.ENDPOINT_PARAM_NAME + "=" + endpoint;
490+
string networkInterface = channelUri.Get(Aeron.Context.INTERFACE_PARAM_NAME);
491+
492+
if (null != networkInterface)
493+
{
494+
return uri + "|" + Aeron.Context.INTERFACE_PARAM_NAME + "=" + networkInterface;
495+
}
496+
497+
return uri;
498+
}
499+
476500

477501
private static void ValidateMedia(string media)
478502
{

src/Adaptive.Aeron/ChannelUriStringBuilder.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,9 +1458,13 @@ public ChannelUriStringBuilder SpiesSimulateConnection(ChannelUri channelUri)
14581458
/// <returns> this for a fluent API. </returns>
14591459
public ChannelUriStringBuilder InitialPosition(long position, int initialTermId, int termLength)
14601460
{
1461-
if (position < 0 || 0 != (position & (FRAME_ALIGNMENT - 1)))
1461+
if (position < 0)
14621462
{
1463-
throw new ArgumentException("invalid position: " + position);
1463+
throw new ArgumentException("invalid position=" + position + " < 0");
1464+
}
1465+
if (0 != (position & (FRAME_ALIGNMENT - 1)))
1466+
{
1467+
throw new ArgumentException("invalid position=" + position + " does not have frame alignment=" + FRAME_ALIGNMENT);
14641468
}
14651469

14661470
int bitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength);

src/Adaptive.Aeron/ClientConductor.cs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,6 @@ internal void OnNewPublication(
310310
);
311311

312312
_resourceByRegIdMap.Put(correlationId, publication);
313-
_asyncCommandIdSet.Remove(correlationId);
314313
}
315314

316315
internal void OnNewExclusivePublication(
@@ -342,7 +341,6 @@ internal void OnNewExclusivePublication(
342341
);
343342

344343
_resourceByRegIdMap.Put(correlationId, publication);
345-
_asyncCommandIdSet.Remove(correlationId);
346344
}
347345

348346
internal void OnNewSubscription(long correlationId, int statusIndicatorId)
@@ -575,7 +573,7 @@ internal ExclusivePublication GetExclusivePublication(long registrationId)
575573
}
576574
}
577575

578-
internal void ReleasePublication(Publication publication)
576+
internal void RemovePublication(Publication publication)
579577
{
580578
_clientLock.Lock();
581579
try
@@ -604,6 +602,44 @@ internal void ReleasePublication(Publication publication)
604602
_clientLock.Unlock();
605603
}
606604
}
605+
606+
internal void RemovePublication(long publicationRegistrationId)
607+
{
608+
_clientLock.Lock();
609+
try
610+
{
611+
if (Aeron.NULL_VALUE == publicationRegistrationId || _isTerminating || _isClosed)
612+
{
613+
return;
614+
}
615+
616+
EnsureNotReentrant();
617+
618+
object resource = _resourceByRegIdMap.Get(publicationRegistrationId);
619+
if (null != resource && !(resource is Publication))
620+
{
621+
throw new AeronException("registration id is not a Publication");
622+
}
623+
624+
Publication publication = (Publication)resource;
625+
if (null != publication)
626+
{
627+
_resourceByRegIdMap.Remove(publicationRegistrationId);
628+
publication.InternalClose();
629+
ReleaseLogBuffers(publication.LogBuffers, publication.OriginalRegistrationId, EXPLICIT_CLOSE_LINGER_NS);
630+
}
631+
632+
if (_asyncCommandIdSet.Remove(publicationRegistrationId) || null != publication)
633+
{
634+
_driverProxy.RemovePublication(publicationRegistrationId);
635+
_stashedChannelByRegistrationId.Remove(publicationRegistrationId);
636+
}
637+
}
638+
finally
639+
{
640+
_clientLock.Unlock();
641+
}
642+
}
607643

608644
internal Subscription AddSubscription(string channel, int streamId)
609645
{
@@ -634,7 +670,7 @@ internal Subscription AddSubscription(string channel, int streamId, AvailableIma
634670
}
635671
}
636672

637-
internal void ReleaseSubscription(Subscription subscription)
673+
internal void RemoveSubscription(Subscription subscription)
638674
{
639675
_clientLock.Lock();
640676
try

0 commit comments

Comments
 (0)