diff --git a/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs
new file mode 100644
index 00000000000..c942e16d4d9
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs
@@ -0,0 +1,119 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2024 Lightbend Inc.
+// // Copyright (C) 2013-2024 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.Streams;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class SelectAsyncBenchmarks
+{
+ public readonly struct IntOrCompletion
+ {
+ public readonly int IntValue;
+ public readonly TaskCompletionSource? Completion;
+
+ public IntOrCompletion(int intValue, TaskCompletionSource? completion)
+ {
+ IntValue = intValue;
+ Completion = completion;
+ }
+ }
+ private ActorSystem system;
+ private ActorMaterializer materializer;
+
+ private IRunnableGraph simpleGraph;
+ private Task selectAsyncStub;
+ private Channel asyncCh;
+ private Task selectAsyncSyncStub;
+ private Channel asyncChSync;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ system = ActorSystem.Create("system");
+ materializer = system.Materializer();
+ asyncCh = Channel.CreateUnbounded();
+
+ asyncChSync = Channel.CreateUnbounded();
+
+
+ selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader)
+ .SelectAsync(4, a =>
+ {
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ }
+ else
+ {
+ }
+
+ return Task.FromResult(NotUsed.Instance);
+ }).RunWith(Sink.Ignore(), materializer);
+
+
+ selectAsyncStub = Source.ChannelReader(asyncCh.Reader)
+ .SelectAsync(4, async a =>
+ {
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ }
+ else
+ {
+ //await Task.Yield();
+ await Task.Delay(0);
+ }
+
+ return NotUsed.Instance;
+ }).RunWith(Sink.Ignore(), materializer);
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ materializer.Dispose();
+ system.Dispose();
+ }
+
+ [Benchmark]
+ public async Task RunSelectAsync()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task RunSelectAsyncSync()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs
new file mode 100644
index 00000000000..5bf3bf0504c
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs
@@ -0,0 +1,122 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2024 Lightbend Inc.
+// // Copyright (C) 2013-2024 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.Streams;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class UnfoldAsyncBenchmarks
+{
+ public readonly struct IntOrCompletion
+ {
+ public readonly int IntValue;
+ public readonly TaskCompletionSource? Completion;
+
+ public IntOrCompletion(int intValue, TaskCompletionSource? completion)
+ {
+ IntValue = intValue;
+ Completion = completion;
+ }
+ }
+ private ActorSystem system;
+ private ActorMaterializer materializer;
+
+ private IRunnableGraph simpleGraph;
+ private Task selectAsyncStub;
+ private Channel asyncNoYieldCh;
+ private Task selectValueTaskAsyncStub;
+ private Task unfoldAsyncSyncStub;
+ private Task selectAsyncValueTaskSyncStub;
+ private Channel asyncYieldCh;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ system = ActorSystem.Create("system");
+ materializer = system.Materializer();
+ asyncNoYieldCh = Channel.CreateUnbounded();
+
+ asyncYieldCh = Channel.CreateUnbounded();
+
+
+ unfoldAsyncSyncStub = Source.UnfoldAsync,int>(asyncYieldCh.Reader, async r =>
+ {
+ var i = await r.ReadAsync();
+ if (i.Completion != null)
+ {
+ i.Completion.TrySetResult();
+ return (r, -1);
+ }
+ else
+ {
+ return (r, i.IntValue);
+ }
+ })
+ .RunWith(Sink.Ignore(), materializer);
+
+ selectAsyncStub = Source.UnfoldAsync,int>(asyncNoYieldCh.Reader,async r =>
+ {
+ await Task.Yield();
+ var a = await r.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ return (r, -1);
+ }
+ else
+ {
+ return (r, a.IntValue);
+ }
+ }).RunWith(Sink.Ignore(), materializer);
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ materializer.Dispose();
+ system.Dispose();
+ }
+
+ [Benchmark]
+ public async Task UnfoldAsyncYieldInConsume()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task UnfoldAsyncYieldInPush()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs
new file mode 100644
index 00000000000..2bec31825fe
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs
@@ -0,0 +1,230 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2024 Lightbend Inc.
+// Copyright (C) 2013-2024 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using Akka.Streams.Implementation.Fusing;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.Streams;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class UnfoldResourceAsyncBenchmarks
+{
+
+ public readonly struct IntOrCompletion
+ {
+ public readonly int IntValue;
+ public readonly TaskCompletionSource? Completion;
+
+ public IntOrCompletion(int intValue, TaskCompletionSource? completion)
+ {
+ IntValue = intValue;
+ Completion = completion;
+ }
+ }
+ private ActorSystem system;
+ private ActorMaterializer materializer;
+
+ private IRunnableGraph simpleGraph;
+ private Task selectAsyncStub;
+ private Channel asyncNoYieldCh;
+ private Task unfoldAsyncSyncStub;
+ private Channel asyncYieldCh;
+ private Channel straightCh;
+ private Task straightTask;
+ private CancellationTokenSource straightChTokenSource;
+ private Channel straightYieldCh;
+ private Task straightYieldTask;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ system = ActorSystem.Create("system");
+ materializer = system.Materializer();
+ asyncNoYieldCh = Channel.CreateUnbounded();
+
+ asyncYieldCh = Channel.CreateUnbounded();
+
+
+ unfoldAsyncSyncStub = Source.UnfoldResourceAsync>(()=> Task.FromResult(asyncYieldCh.Reader), async r =>
+ {
+ var i = await r.ReadAsync();
+ if (i.Completion != null)
+ {
+ i.Completion.TrySetResult();
+ return -1;
+ }
+ else
+ {
+ return i.IntValue;
+ }
+ }, (r)=> Task.FromResult(Done.Instance))
+ .RunWith(Sink.Ignore(), materializer);
+
+ selectAsyncStub = Source.UnfoldResourceAsync>(()=>Task.FromResult(asyncNoYieldCh.Reader),async r =>
+ {
+ await Task.Yield();
+ var a = await r.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ // await Task.Delay(0);
+ return a.IntValue;
+ }
+ }, (r)=> Task.FromResult(Done.Instance) ).RunWith(Sink.Ignore(), materializer);
+
+
+ straightChTokenSource = new CancellationTokenSource();
+ straightCh = Channel.CreateUnbounded();
+
+
+ straightTask = Task.Run(async () =>
+ {
+ static async IAsyncEnumerable GetEnumerator(
+ ChannelReader reader, CancellationToken token)
+ {
+ while (token.IsCancellationRequested == false)
+ {
+ await Task.Yield();
+ var a = await reader.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ yield return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ //await Task.Delay(0);
+ yield return a.IntValue;
+ }
+ }
+ }
+ var r = straightCh.Reader;
+ await foreach (var v in GetEnumerator(r,straightChTokenSource.Token))
+ {
+
+ }
+ });
+
+ straightYieldCh = Channel.CreateUnbounded();
+
+
+ straightYieldTask = Task.Run(async () =>
+ {
+ static async IAsyncEnumerable GetEnumerator(
+ ChannelReader reader, CancellationToken token)
+ {
+ while (token.IsCancellationRequested == false)
+ {
+ var a = await reader.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ yield return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ //await Task.Delay(0);
+ yield return a.IntValue;
+ }
+ }
+ }
+ var r = straightYieldCh.Reader;
+ await foreach (var v in GetEnumerator(r,straightChTokenSource.Token))
+ {
+
+ }
+ });
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ materializer.Dispose();
+ system.Dispose();
+ straightChTokenSource.Cancel();
+ }
+
+ [Benchmark]
+ public async Task UnfoldResourceAsyncNoYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task UnfoldResourceAsyncWithYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+
+ [Benchmark]
+ public async Task StraightChannelReadNoYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ straightCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ straightCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task StraightChannelReadWithYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ straightYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ straightYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs
index dba48031db8..412279e57df 100644
--- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs
+++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs
@@ -6,6 +6,7 @@
//-----------------------------------------------------------------------
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -98,11 +99,37 @@ public override string ToString() => "GraphModule\n" +
[InternalApi]
public sealed class GraphInterpreterShell
{
+ internal sealed class
+ BoxedBoundaryEvent : ActorGraphInterpreter.IBoundaryEvent
+ where T : struct, ActorGraphInterpreter.IBoundaryEvent
+ {
+ public T Boxed { get; private set; }
+ public BoxedBoundaryEvent()
+ {
+
+ }
+ public BoxedBoundaryEvent SetBox(T value)
+ {
+ Boxed = value;
+ return this;
+ }
+
+ public GraphInterpreterShell Shell => Boxed.Shell;
+ }
+
private readonly GraphAssembly _assembly;
private readonly Connection[] _connections;
private readonly GraphStageLogic[] _logics;
private readonly Shape _shape;
private readonly ActorMaterializerSettings _settings;
+
+ private readonly ObjectPoolStuff.ObjectPoolV2> _onNextPool =
+ new (Environment.ProcessorCount*2);
+ private readonly ObjectPoolStuff.ObjectPoolV2> _requestMorePool =
+ new (Environment.ProcessorCount*2);
+ private readonly ObjectPoolStuff.ObjectPoolV2>
+ _asyncInputPool = new (Environment.ProcessorCount*2);
+ private readonly ActorGraphInterpreter.Resume _resume;
///
/// TBD
///
@@ -127,7 +154,6 @@ public sealed class GraphInterpreterShell
private readonly int _abortLimit;
private readonly ActorGraphInterpreter.BatchingActorInputBoundary[] _inputs;
private readonly ActorGraphInterpreter.IActorOutputBoundary[] _outputs;
-
private ILoggingAdapter _log;
private GraphInterpreter _interpreter;
private int _subscribersPending;
@@ -136,8 +162,7 @@ public sealed class GraphInterpreterShell
private bool _waitingForShutdown;
private Action