Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
using Akka.Util;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Actor.Dispatch
{
public abstract class ActorModelSpec : AkkaSpec
{
protected ActorModelSpec(Config hocon) : base(hocon) { }
private readonly ITestOutputHelper _testOutputHelper;
protected ActorModelSpec(Config hocon, ITestOutputHelper output = null) : base(hocon, output)
{
_testOutputHelper = output;
}

interface IActorModelMessage : INoSerializationVerificationNeeded { }

Expand Down Expand Up @@ -168,6 +173,13 @@ private DoubleStop() { }
public static readonly DoubleStop Instance = new DoubleStop();
}

private class GetStats : IActorModelMessage
{
private GetStats(){}

public static readonly GetStats Instance = new GetStats();
}

sealed class ThrowException : IActorModelMessage
{
public ThrowException(Exception e)
Expand All @@ -184,7 +196,7 @@ public ThrowException(Exception e)
class DispatcherActor : ReceiveActor
{
private Switch _busy = new Switch(false);

private readonly ILoggingAdapter _log = Context.GetLogger();
private MessageDispatcherInterceptor _interceptor = Context.Dispatcher.AsInstanceOf<MessageDispatcherInterceptor>();

private void Ack()
Expand Down Expand Up @@ -222,6 +234,11 @@ public DispatcherActor()
Receive<InterruptNicely>(interrupt => { Ack(); Sender.Tell(interrupt.Expect); _busy.SwitchOff(); });
Receive<ThrowException>(throwEx => { Ack(); _busy.SwitchOff(); throw throwEx.E; }, throwEx => true);
Receive<DoubleStop>(doubleStop => { Ack(); Context.Stop(Self); Context.Stop(Self); _busy.SwitchOff(); });
Receive<GetStats>(stats => {
Ack();
Sender.Tell(_interceptor.GetStats(Self));
_busy.SwitchOff();
});
}
}

Expand Down Expand Up @@ -474,7 +491,7 @@ public void A_dispatcher_must_process_messages_one_at_a_time()
AssertRefDefaultZero(a, registers: 1, msgsReceived: 3, msgsProcessed: 3, unregisters: 1, dispatcher: dispatcher);
}

[Fact(Skip = "Racy on Azure DevOps")]
[Fact]
public void A_dispatcher_must_handle_queuing_from_multiple_threads()
{
var dispatcher = InterceptedDispatcher();
Expand All @@ -487,14 +504,26 @@ public void A_dispatcher_must_handle_queuing_from_multiple_threads()
{
foreach (var c in Enumerable.Range(1, 20))
{
a.Tell(new WaitAck(1, counter));
a.Tell(new CountDown(counter));
}
});
}

AssertCountdown(counter, (int)Dilated(TimeSpan.FromSeconds(3.0)).TotalMilliseconds, "Should process 200 messages");
AssertRefDefaultZero(a, dispatcher, registers: 1, msgsReceived: 200, msgsProcessed: 200);
Sys.Stop(a);
try
{
AssertCountdown(counter, (int)Dilated(TimeSpan.FromSeconds(3.0)).TotalMilliseconds,
"Should process 200 messages");
AssertRefDefaultZero(a, dispatcher, registers: 1, msgsReceived: 200, msgsProcessed: 200);
}
finally
{
var stats = a.Ask<InterceptorStats>(GetStats.Instance).Result;
_testOutputHelper.WriteLine("Observed stats: {0}", stats);

Sys.Stop(a);
}


}

[Fact]
Expand Down Expand Up @@ -643,7 +672,7 @@ public class DispatcherModelSpec : ActorModelSpec

";

public DispatcherModelSpec() : base(DispatcherHocon) { }
public DispatcherModelSpec(ITestOutputHelper output) : base(DispatcherHocon, output) { }

protected override MessageDispatcherInterceptor InterceptedDispatcher()
{
Expand Down