Skip to content

Commit 48a704c

Browse files
authored
Ask should push unhandled answers into deadletter 2 (#5259)
* Ask should push unhandled answers into deadletter * update future handler * fix unit test * remove not needed return * remove redundant sync lock * remove redudant code and seal class * update api spec * update api spec 2 * handle of Status.Failure * ask should fail on system messages
1 parent 1f779fe commit 48a704c

File tree

4 files changed

+134
-74
lines changed

4 files changed

+134
-74
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -862,12 +862,11 @@ namespace Akka.Actor
862862
public Failures() { }
863863
public System.Collections.Generic.List<Akka.Actor.Failure> Entries { get; }
864864
}
865-
public class FutureActorRef<T> : Akka.Actor.MinimalActorRef
865+
public sealed class FutureActorRef<T> : Akka.Actor.MinimalActorRef
866866
{
867-
public FutureActorRef(System.Threading.Tasks.TaskCompletionSource<T> result, System.Action<System.Threading.Tasks.Task> unregister, Akka.Actor.ActorPath path) { }
867+
public FutureActorRef(System.Threading.Tasks.TaskCompletionSource<T> result, Akka.Actor.ActorPath path, Akka.Actor.IActorRefProvider provider) { }
868868
public override Akka.Actor.ActorPath Path { get; }
869869
public override Akka.Actor.IActorRefProvider Provider { get; }
870-
public override void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage message) { }
871870
protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { }
872871
}
873872
public class static Futures

src/core/Akka.Tests/Actor/AskSpec.cs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using Akka.Util.Internal;
1616
using FluentAssertions;
1717
using Nito.AsyncEx;
18+
using Akka.Dispatch.SysMsg;
1819

1920
namespace Akka.Tests.Actor
2021
{
@@ -37,6 +38,29 @@ protected override void OnReceive(object message)
3738
{
3839
Sender.Tell("answer");
3940
}
41+
42+
if (message.Equals("delay"))
43+
{
44+
Thread.Sleep(3000);
45+
Sender.Tell("answer");
46+
}
47+
48+
if (message.Equals("many"))
49+
{
50+
Sender.Tell("answer1");
51+
Sender.Tell("answer2");
52+
Sender.Tell("answer2");
53+
}
54+
55+
if (message.Equals("invalid"))
56+
{
57+
Sender.Tell(123);
58+
}
59+
60+
if (message.Equals("system"))
61+
{
62+
Sender.Tell(new DummySystemMessage());
63+
}
4064
}
4165
}
4266

@@ -83,6 +107,10 @@ protected override void OnReceive(object message)
83107
}
84108
}
85109

110+
public sealed class DummySystemMessage : ISystemMessage
111+
{
112+
}
113+
86114
[Fact]
87115
public async Task Can_Ask_Response_actor()
88116
{
@@ -114,6 +142,60 @@ public async Task Can_get_timeout_when_asking_actor()
114142
await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("timeout", TimeSpan.FromSeconds(3)));
115143
}
116144

145+
[Fact]
146+
public async Task Ask_should_put_timeout_answer_into_deadletter()
147+
{
148+
var actor = Sys.ActorOf<SomeActor>();
149+
150+
await EventFilter.DeadLetter<object>().ExpectOneAsync(TimeSpan.FromSeconds(5), async () =>
151+
{
152+
await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("delay", TimeSpan.FromSeconds(1)));
153+
});
154+
}
155+
156+
[Fact]
157+
public async Task Ask_should_put_too_many_answers_into_deadletter()
158+
{
159+
var actor = Sys.ActorOf<SomeActor>();
160+
161+
await EventFilter.DeadLetter<object>().ExpectAsync(2, async () =>
162+
{
163+
var result = await actor.Ask<string>("many", TimeSpan.FromSeconds(1));
164+
result.ShouldBe("answer1");
165+
});
166+
}
167+
168+
[Fact]
169+
public async Task Ask_should_not_put_canceled_answer_into_deadletter()
170+
{
171+
var actor = Sys.ActorOf<SomeActor>();
172+
173+
await EventFilter.DeadLetter<object>().ExpectAsync(0, async () =>
174+
{
175+
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)))
176+
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("delay", Timeout.InfiniteTimeSpan, cts.Token));
177+
});
178+
}
179+
180+
[Fact]
181+
public async Task Ask_should_put_invalid_answer_into_deadletter()
182+
{
183+
var actor = Sys.ActorOf<SomeActor>();
184+
185+
await EventFilter.DeadLetter<object>().ExpectOne(async () =>
186+
{
187+
await Assert.ThrowsAsync<ArgumentException>(async () => await actor.Ask<string>("invalid", TimeSpan.FromSeconds(1)));
188+
});
189+
}
190+
191+
[Fact]
192+
public async Task Ask_should_fail_on_system_message()
193+
{
194+
var actor = Sys.ActorOf<SomeActor>();
195+
196+
await Assert.ThrowsAsync<InvalidOperationException>(async () => await actor.Ask<ISystemMessage>("system", TimeSpan.FromSeconds(1)));
197+
}
198+
117199
[Fact]
118200
public async Task Can_cancel_when_asking_actor()
119201
{

src/core/Akka/Actor/ActorRef.cs

Lines changed: 44 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -69,51 +69,34 @@ public interface IRepointableRef : IActorRefScope
6969
///
7070
/// ActorRef implementation used for one-off tasks.
7171
/// </summary>
72-
public class FutureActorRef<T> : MinimalActorRef
72+
public sealed class FutureActorRef<T> : MinimalActorRef
7373
{
7474
private readonly TaskCompletionSource<T> _result;
7575
private readonly ActorPath _path;
76+
private readonly IActorRefProvider _provider;
7677

7778
/// <summary>
7879
/// INTERNAL API
7980
/// </summary>
8081
/// <param name="result">TBD</param>
81-
/// <param name="unregister">TBD</param>
8282
/// <param name="path">TBD</param>
83-
public FutureActorRef(TaskCompletionSource<T> result, Action<Task> unregister, ActorPath path)
83+
/// <param name="provider">TBD</param>
84+
public FutureActorRef(TaskCompletionSource<T> result, ActorPath path, IActorRefProvider provider)
8485
{
85-
if (ActorCell.Current != null)
86-
{
87-
_actorAwaitingResultSender = ActorCell.Current.Sender;
88-
}
8986
_result = result;
9087
_path = path;
91-
92-
_result.Task.ContinueWith(unregister);
88+
_provider = provider;
9389
}
9490

9591
/// <summary>
9692
/// TBD
9793
/// </summary>
98-
public override ActorPath Path
99-
{
100-
get { return _path; }
101-
}
94+
public override ActorPath Path => _path;
10295

10396
/// <summary>
10497
/// TBD
10598
/// </summary>
106-
/// <exception cref="System.NotImplementedException">TBD</exception>
107-
public override IActorRefProvider Provider
108-
{
109-
get { throw new NotImplementedException(); }
110-
}
111-
112-
113-
private const int INITIATED = 0;
114-
private const int COMPLETED = 1;
115-
private int status = INITIATED;
116-
private readonly IActorRef _actorAwaitingResultSender;
99+
public override IActorRefProvider Provider => _provider;
117100

118101
/// <summary>
119102
/// TBD
@@ -122,43 +105,36 @@ public override IActorRefProvider Provider
122105
/// <param name="sender">TBD</param>
123106
protected override void TellInternal(object message, IActorRef sender)
124107
{
108+
var handled = false;
125109

126-
if (message is ISystemMessage sysM) //we have special handling for system messages
127-
{
128-
SendSystemMessage(sysM);
129-
}
130-
else
110+
switch (message)
131111
{
132-
if (Interlocked.Exchange(ref status, COMPLETED) == INITIATED)
133-
{
134-
if (message is T t)
135-
{
136-
_result.TrySetResult(t);
137-
}
138-
else if (message == null) //special case: https://github.com/akkadotnet/akka.net/issues/5204
139-
{
140-
_result.TrySetResult(default);
141-
}
142-
else if (message is Failure f)
143-
{
144-
_result.TrySetException(f.Exception ?? new TaskCanceledException("Task cancelled by actor via Failure message."));
145-
}
146-
else
147-
{
148-
_result.TrySetException(new ArgumentException(
149-
$"Received message of type [{message.GetType()}] - Ask expected message of type [{typeof(T)}]"));
150-
}
151-
}
112+
case ISystemMessage msg:
113+
handled = _result.TrySetException(new InvalidOperationException($"system message of type '{msg.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}"));
114+
break;
115+
case T t:
116+
handled = _result.TrySetResult(t);
117+
break;
118+
case null:
119+
handled = _result.TrySetResult(default);
120+
break;
121+
case Status.Failure f:
122+
handled = _result.TrySetException(f.Cause
123+
?? new TaskCanceledException("Task cancelled by actor via Failure message."));
124+
break;
125+
case Failure f:
126+
handled = _result.TrySetException(f.Exception
127+
?? new TaskCanceledException("Task cancelled by actor via Failure message."));
128+
break;
129+
default:
130+
_ = _result.TrySetException(new ArgumentException(
131+
$"Received message of type [{message.GetType()}] - Ask expected message of type [{typeof(T)}]"));
132+
break;
152133
}
153-
}
154-
155-
/// <summary>
156-
/// TBD
157-
/// </summary>
158-
/// <param name="message">TBD</param>
159-
public override void SendSystemMessage(ISystemMessage message)
160-
{
161-
base.SendSystemMessage(message);
134+
135+
//ignore canceled ask and put unhandled answers into deadletter
136+
if (!handled && !_result.Task.IsCanceled)
137+
_provider.DeadLetters.Tell(message ?? default(T), this);
162138
}
163139
}
164140

@@ -727,16 +703,16 @@ public abstract class ActorRefWithCell : InternalActorRefBase
727703
private IEnumerable<IActorRef> SelfAndChildren()
728704
{
729705
yield return this;
730-
foreach(var child in Children.SelectMany(x =>
731-
{
732-
switch(x)
733-
{
734-
case ActorRefWithCell cell:
735-
return cell.SelfAndChildren();
736-
default:
737-
return new[] { x };
738-
}
739-
}))
706+
foreach (var child in Children.SelectMany(x =>
707+
{
708+
switch (x)
709+
{
710+
case ActorRefWithCell cell:
711+
return cell.SelfAndChildren();
712+
default:
713+
return new[] { x };
714+
}
715+
}))
740716
{
741717
yield return child;
742718
}

src/core/Akka/Actor/Futures.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,19 @@ public static Task<T> Ask<T>(this ICanTell self, Func<IActorRef, object> message
149149
}
150150

151151
//create a new tempcontainer path
152-
ActorPath path = provider.TempPath();
152+
var path = provider.TempPath();
153153

154-
var future = new FutureActorRef<T>(result, t =>
154+
var future = new FutureActorRef<T>(result, path, provider);
155+
156+
//The future actor needs to be unregistered in the temp container
157+
_ = result.Task.ContinueWith(t =>
155158
{
156159
provider.UnregisterTempActor(path);
157160

158161
ctr1?.Dispose();
159162
ctr2?.Dispose();
160163
timeoutCancellation?.Dispose();
161-
}, path);
164+
}, TaskContinuationOptions.ExecuteSynchronously);
162165

163166
//The future actor needs to be registered in the temp container
164167
provider.RegisterTempActor(future, path);

0 commit comments

Comments
 (0)