Skip to content

Commit 1b52686

Browse files
authored
[Streams] Fix null exceptions being propagated on downstream completion (#7497)
* [Streams] Fix null exceptions being propagated on downstream completion * Add missing exception code * Update API Approval list
1 parent 8ea9d3b commit 1b52686

File tree

4 files changed

+42
-2
lines changed

4 files changed

+42
-2
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4128,6 +4128,10 @@ namespace Akka.Streams.Implementation.Fusing
41284128
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
41294129
public override string ToString() { }
41304130
}
4131+
public class DownstreamCompletedWithNoCauseException : System.Exception
4132+
{
4133+
public DownstreamCompletedWithNoCauseException() { }
4134+
}
41314135
[Akka.Annotations.InternalApiAttribute()]
41324136
public sealed class Expand<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
41334137
{

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4102,6 +4102,10 @@ namespace Akka.Streams.Implementation.Fusing
41024102
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
41034103
public override string ToString() { }
41044104
}
4105+
public class DownstreamCompletedWithNoCauseException : System.Exception
4106+
{
4107+
public DownstreamCompletedWithNoCauseException() { }
4108+
}
41054109
[Akka.Annotations.InternalApiAttribute()]
41064110
public sealed class Expand<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
41074111
{
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="DownstreamCompletedWithNoCauseException.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System;
9+
10+
namespace Akka.Streams.Implementation.Fusing;
11+
12+
public class DownstreamCompletedWithNoCauseException: Exception
13+
{
14+
public DownstreamCompletedWithNoCauseException() : base("Downstream stage/flow completed with no cause")
15+
{
16+
}
17+
}

src/core/Akka.Streams/Stage/GraphStage.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,10 +1297,25 @@ public void InternalOnDownstreamFinish(Exception cause)
12971297
{
12981298
try
12991299
{
1300-
if (cause == null)
1301-
throw new ArgumentException("Cancellation cause must not be null", nameof(cause));
13021300
if (_lastCancellationCause != null)
13031301
throw new ArgumentException("OnDownstreamFinish must not be called recursively", nameof(cause));
1302+
1303+
// Some stages might propagate null exceptions due to improper Task continuation handling
1304+
// (see https://github.com/akkadotnet/Akka.Persistence.Sql/issues/498)
1305+
// This is a stop gap solution to make sure that Akka.Streams doesn't behave improperly
1306+
// until we can fix all of those
1307+
if (cause is null)
1308+
{
1309+
try
1310+
{
1311+
throw new DownstreamCompletedWithNoCauseException();
1312+
}
1313+
catch (DownstreamCompletedWithNoCauseException e)
1314+
{
1315+
cause = e;
1316+
}
1317+
}
1318+
13041319
_lastCancellationCause = cause;
13051320
CancelStage(_lastCancellationCause);
13061321
}

0 commit comments

Comments
 (0)