diff --git a/src/benchmark/Akka.Benchmarks/IO/TcpOperationsBenchmarks.cs b/src/benchmark/Akka.Benchmarks/IO/TcpOperationsBenchmarks.cs index 4376e213cb3..d106df23704 100644 --- a/src/benchmark/Akka.Benchmarks/IO/TcpOperationsBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/IO/TcpOperationsBenchmarks.cs @@ -191,7 +191,7 @@ public EchoConnection(IActorRef connection, int messageSize) } } - private class ClientCoordinator : ReceiveActor, IWithTimers, IWithStash + private class ClientCoordinator : ReceiveActor, IWithStash { private readonly IActorRef _echoServer; private readonly HashSet _waitingChildren = new(); @@ -211,6 +211,7 @@ public ClientCoordinator(IActorRef echoServer, int clientsCount) { _echoServer = echoServer; _clientsCount = clientsCount; + Timers = Context.Timers; Receive(endpoint => { @@ -281,7 +282,7 @@ protected override void PreStart() _echoServer.Tell(GetBindAddress.Instance); } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } public IStash Stash { get; set; } } @@ -322,7 +323,7 @@ public IEnumerable Deframe(ByteString data) } } - private class Client : ReceiveActor, IWithTimers + private class Client : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private int _receivedCount = 0; @@ -344,6 +345,8 @@ private RetryConnect() public Client(EndPoint endpoint, int messagesToSend, byte[] message) { _framer = new Framer(message.Length); + Timers = Context.Timers; + var write = // create the write only once Tcp.Write.Create(ByteString.FromBytes(message)); @@ -409,7 +412,7 @@ private static void DoConnect(EndPoint endpoint) Context.System.Tcp().Tell(new Tcp.Connect(endpoint, timeout: TimeSpan.FromSeconds(5))); } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } } } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs index 2583296f5c0..de8d03455f1 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs @@ -77,7 +77,7 @@ public SetStore(IActorRef store) => /// /// A journal that delegates actual storage to a target actor. For testing only. /// - public abstract class AsyncWriteProxyEx : AsyncWriteJournal, IWithUnboundedStash, IWithTimers + public abstract class AsyncWriteProxyEx : AsyncWriteJournal, IWithUnboundedStash { private const string InitTimeoutTimerKey = nameof(InitTimeoutTimerKey); @@ -99,6 +99,7 @@ protected AsyncWriteProxyEx() _isInitialized = false; _isInitTimedOut = false; _store = null; + Timers = Context.Timers; } /// @@ -257,7 +258,7 @@ private Task StoreNotInitialized() /// public IStash Stash { get; set; } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } } /// diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs index ff9e8a520e8..fd6fa4bce40 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs @@ -123,7 +123,7 @@ protected override bool Receive(object message) } // slow stop previously made it more likely that the coordinator would stop before the local region - public class SlowStopShardedEntity : ActorBase, IWithTimers + public class SlowStopShardedEntity : ActorBase { #region StopMessage public class Stop @@ -143,8 +143,6 @@ private ActualStop() } } - public ITimerScheduler Timers { get; set; } - #region DelayedStop protected override bool Receive(object message) { @@ -154,7 +152,7 @@ protected override bool Receive(object message) Sender.Tell(id); return true; case Stop _: - Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50)); + Context.Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50)); return true; case ActualStop _: Context.Stop(Self); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs index 1c7c4f89b72..2b1ef6352c6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs @@ -15,13 +15,14 @@ namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery; /// /// INTERNAL API /// -public sealed class SlowStopConsumerEntity : ReceiveActor, IWithTimers +public sealed class SlowStopConsumerEntity : ReceiveActor { private readonly IActorRef _consumerController; public SlowStopConsumerEntity(string persistenceId, IActorRef consumerController) { _consumerController = consumerController; + Timers = Context.Timers; Receive>(delivery => { @@ -57,5 +58,5 @@ private ActualStop() { } public sealed record Job(int Payload, IActorRef Probe); - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs index 47e310555c1..532e0c089f0 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs @@ -12,7 +12,7 @@ namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery; -internal class TestShardingProducer : ReceiveActor, IWithTimers, IWithStash +internal class TestShardingProducer : ReceiveActor, IWithStash { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IActorRef _producerController; @@ -22,10 +22,11 @@ public TestShardingProducer(IActorRef producerController, IActorRef probe) { _producerController = producerController; _probe = probe; + Timers = Context.Timers; Idle(); } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } public IStash Stash { get; set; } = null!; public IActorRef SendNext { get; set; } = ActorRefs.Nobody; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs index a7964da8924..ca8123faa1c 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs @@ -16,7 +16,7 @@ namespace Akka.Cluster.Sharding.Tests { - public abstract class SnapshotStoreProxy : SnapshotStore, IWithUnboundedStash, IWithTimers + public abstract class SnapshotStoreProxy : SnapshotStore, IWithUnboundedStash { private const string TimeoutTimerKey = nameof(TimeoutTimerKey); @@ -39,6 +39,7 @@ protected SnapshotStoreProxy() _isInitialized = false; _isInitTimedOut = false; _store = null; + Timers = Context.Timers; } /// @@ -51,7 +52,7 @@ protected SnapshotStoreProxy() /// public IStash Stash { get; set; } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } /// /// TBD diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs index 4f79a40454e..7d31ab8e97c 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs @@ -541,7 +541,7 @@ protected override void PreStart() } } - private class TestShardingProducer : ReceiveActor, IWithTimers + private class TestShardingProducer : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IActorRef _producerController; @@ -553,8 +553,6 @@ public TestShardingProducer(IActorRef producerController) Idle(0); } - public ITimerScheduler Timers { get; set; } = null!; - protected override void PreStart() { var self = Self; @@ -570,7 +568,7 @@ protected override void PreStart() }, "sendNextAdapter"); // simulate fast producer - Timers.StartPeriodicTimer("tick", Tick.Instance, TimeSpan.FromMilliseconds(20)); + Context.Timers.StartPeriodicTimer("tick", Tick.Instance, TimeSpan.FromMilliseconds(20)); } private void Idle(int n) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesFailureSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesFailureSpec.cs index cadf0248841..5ed758a9d23 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesFailureSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesFailureSpec.cs @@ -165,7 +165,7 @@ public Props CoordinatorStoreProps() } } - private class FakeShardStoreActor : ActorBase, IWithTimers + private class FakeShardStoreActor : ActorBase { public static Props Props(string shardId) => Actor.Props.Create(() => new FakeShardStoreActor(shardId)); @@ -202,10 +202,11 @@ public Delayed(IActorRef replyTo, object msg) public FakeShardStoreActor(string shardId) { this.shardId = shardId; + Timers = Context.Timers; Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId)); } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } //protected override void PreStart() //{ @@ -277,7 +278,7 @@ protected override bool Receive(object message) } } - private class FakeCoordinatorStoreActor : ActorBase, IWithTimers + private class FakeCoordinatorStoreActor : ActorBase { public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor()); @@ -306,10 +307,11 @@ public ClearFailShard(string shardId) private readonly ILoggingAdapter log = Context.GetLogger(); private ImmutableDictionary failAddShard = ImmutableDictionary.Empty; - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } public FakeCoordinatorStoreActor() { + Timers = Context.Timers; Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self)); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesSupervisionStrategyDecisionSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesSupervisionStrategyDecisionSpec.cs index c9381f14782..2bc901e8cfa 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesSupervisionStrategyDecisionSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesSupervisionStrategyDecisionSpec.cs @@ -172,7 +172,7 @@ public Props CoordinatorStoreProps() } } - private class FakeShardStoreActor : ActorBase, IWithTimers + private class FakeShardStoreActor : ActorBase { public static Props Props(string shardId) => Actor.Props.Create(() => new FakeShardStoreActor(shardId)); @@ -185,8 +185,6 @@ public FakeShardStoreActor(string shardId) Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId)); } - public ITimerScheduler Timers { get; set; } - protected override bool Receive(object message) { switch (message) @@ -202,14 +200,12 @@ protected override bool Receive(object message) } } - private class FakeCoordinatorStoreActor : ActorBase, IWithTimers + private class FakeCoordinatorStoreActor : ActorBase { public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor()); private readonly ILoggingAdapter _log = Context.GetLogger(); - public ITimerScheduler Timers { get; set; } - public FakeCoordinatorStoreActor() { Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self)); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs index c926eac353b..4004bd7da77 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs @@ -24,7 +24,7 @@ namespace Akka.Cluster.Sharding /// /// Singleton coordinator (with state based on ddata) that decides where to allocate shards. /// - internal sealed class DDataShardCoordinator : ActorBase, IWithTimers, IWithUnboundedStash + internal sealed class DDataShardCoordinator : ActorBase, IWithUnboundedStash { private sealed class RememberEntitiesStoreStopped @@ -90,7 +90,7 @@ internal static Props Props( private readonly IActorRef? _rememberEntitiesStore; private readonly bool _rememberEntities; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } public IStash Stash { get; set; } = null!; private string TypeName => _baseImpl.TypeName; @@ -130,6 +130,7 @@ public DDataShardCoordinator( _initEmptyState = CoordinatorState.Empty.WithRememberEntities(settings.RememberEntities); + Timers = Context.Timers; if (rememberEntitiesStoreProvider != null) { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs index 57ab5ca612d..79f2e2eb776 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs @@ -20,7 +20,7 @@ namespace Akka.Cluster.Sharding.Delivery.Internal; /// INTERNAL API /// /// The types of messages handled by the ConsumerController -internal class ShardingConsumerController : ReceiveActor, IWithStash, IWithTimers +internal class ShardingConsumerController : ReceiveActor, IWithStash { private const string ShutdownTimeoutTimerKey = nameof(ShutdownTimeoutTimerKey); @@ -35,6 +35,7 @@ public ShardingConsumerController(Func consumerProps, { ConsumerProps = consumerProps; Settings = settings; + Timers = Context.Timers; WaitForStart(); } @@ -257,5 +258,5 @@ protected override void PreStart() } public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs index b2d4df577e5..961b06ca44f 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs @@ -27,7 +27,7 @@ namespace Akka.Cluster.Sharding.Delivery.Internal; using TotalSeqNr = Int64; using OutSeqNr = Int64; -internal sealed class ShardingProducerController : ReceiveActor, IWithStash, IWithTimers +internal sealed class ShardingProducerController : ReceiveActor, IWithStash { public string ProducerId { get; } @@ -48,7 +48,7 @@ internal sealed class ShardingProducerController : ReceiveActor, IWithStash, private readonly ILoggingAdapter _log = Context.GetLogger(); public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } public ShardingProducerController(string producerId, IActorRef shardRegion, Option durableQueueProps, ShardingProducerController.Settings settings, ITimeProvider? timeProvider = null) @@ -59,6 +59,7 @@ public ShardingProducerController(string producerId, IActorRef shardRegion, Opti Settings = settings; _timeProvider = timeProvider ?? Context.System.Scheduler; + Timers = Context.Timers; WaitingForStart(Option.None, CreateInitialState(_durableQueueProps.HasValue)); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs index f29ef82134b..66eb760b4db 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs @@ -16,7 +16,7 @@ namespace Akka.Cluster.Sharding.Internal using EntityId = String; using ShardId = String; - internal class RememberEntityStarter : ActorBase, IWithTimers + internal class RememberEntityStarter : ActorBase { public static Props Props( IActorRef region, @@ -90,10 +90,13 @@ public RememberEntityStarter( OnStartBatch(settings.TuningParameters.EntityRecoveryConstantRateStrategyNumberOfEntities); break; } + + Timers = Context.Timers; + Timers.StartPeriodicTimer("retry", ResendUnAcked.Instance, settings.TuningParameters.RetryInterval); } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } public ILoggingAdapter Log { get; } = Context.GetLogger(); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 6d9d3b2de5f..770fb93eaa4 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -34,7 +34,7 @@ namespace Akka.Cluster.Sharding /// responsible for. /// [InternalStableApi] - internal sealed class Shard : ActorBase, IWithTimers, IWithUnboundedStash + internal sealed class Shard : ActorBase, IWithUnboundedStash { #region messages @@ -971,7 +971,7 @@ public override string ToString() public ILoggingAdapter Log { get; } = Context.GetLogger(); public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } public Shard( string typeName, @@ -992,6 +992,7 @@ public Shard( _verboseDebug = Context.System.Settings.Config.GetBoolean("akka.cluster.sharding.verbose-debug-logging"); + Timers = Context.Timers; if (rememberEntitiesProvider != null) { var store = Context.ActorOf(rememberEntitiesProvider.ShardStoreProps(shardId).WithDeploy(Deploy.Local), diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs index 1b39526b6cf..2c6b52b09b5 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs @@ -1450,7 +1450,7 @@ public RebalanceResult(IImmutableSet shards) /// . If the process takes longer than the `handOffTimeout` it /// also sends . /// - internal class RebalanceWorker : ActorBase, IWithTimers + internal class RebalanceWorker : ActorBase { public sealed class ShardRegionTerminated { @@ -1490,7 +1490,7 @@ public static Props Props( private ILoggingAdapter Log { get { return _log ??= Context.GetLogger(); } } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } /// /// TBD @@ -1529,6 +1529,7 @@ public RebalanceWorker( shardRegionFrom, regions.Count()); + Timers = Context.Timers; Timers.StartSingleTimer("hand-off-timeout", ReceiveTimeout.Instance, handOffTimeout); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 7cc92c2cf14..6b9cc570e19 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -34,7 +34,7 @@ namespace Akka.Cluster.Sharding /// actor on other nodes. /// [InternalStableApi] - public sealed class ShardRegion : ActorBase, IWithTimers + public sealed class ShardRegion : ActorBase { #region messages @@ -220,7 +220,7 @@ public override int GetHashCode() /// them have terminated it replies with . /// If the entities don't terminate after `handoffTimeout` it will try stopping them forcefully. /// - internal class HandOffStopper : ReceiveActor, IWithTimers + internal class HandOffStopper : ReceiveActor { private sealed class StopTimeout { @@ -244,7 +244,7 @@ private StopTimeoutWarning() public ILoggingAdapter Log { get; } = Context.GetLogger(); - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } /// /// TBD @@ -289,6 +289,7 @@ public HandOffStopper( TimeSpan handoffTimeout) { var remaining = new HashSet(entities); + Timers = Context.Timers; Receive(t => { @@ -466,6 +467,8 @@ public ShardRegion( _nextRegistrationDelay = _initRegistrationDelay; _bufferMessageAdapter = ClusterSharding.Get(Context.System).BufferMessageAdapter; + + Timers = Context.Timers; SetupCoordinatedShutdown(); } @@ -487,7 +490,7 @@ private void SetupCoordinatedShutdown() }); } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } /// /// When leaving the coordinator singleton is started rather quickly on next diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs index 9d8b3230d31..c3506d4f42a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs @@ -16,7 +16,7 @@ namespace Akka.Cluster.Sharding { - internal sealed class KeepAlivePinger : UntypedActor, IWithTimers + internal sealed class KeepAlivePinger : UntypedActor { private sealed class Tick { @@ -32,7 +32,7 @@ private Tick() public IActorRef ShardingRef { get; } public ShardedDaemonProcessSettings Settings { get; } - public ITimerScheduler Timers { get; set; } = null!; // gets set by Akka.NET + private ITimerScheduler Timers { get; } public static Props Props(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) => @@ -45,6 +45,7 @@ public KeepAlivePinger(ShardedDaemonProcessSettings settings, string name, strin Name = name; Identities = identities; ShardingRef = shardingRef; + Timers = Context.Timers; } protected override void PreStart() diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs index 6527692b3e1..29225c6cfb4 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -21,7 +21,7 @@ #nullable enable namespace Akka.Cluster.Tools.Client; -public class ClusterClientDiscovery: UntypedActor, IWithUnboundedStash, IWithTimers +public class ClusterClientDiscovery: UntypedActor, IWithUnboundedStash { #region Discovery messages @@ -114,11 +114,13 @@ public ClusterClientDiscovery(ClusterClientSettings settings) _verboseLogging = _settings.VerboseLogging; + Timers = Context.Timers; + Become(Discovering); } public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } protected override void OnReceive(object message) { diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 8bba912fc30..370e4da09a8 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -99,7 +99,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe /// replies. /// /// - public class DistributedPubSubMediator : ReceiveActor, IWithTimers + public class DistributedPubSubMediator : ReceiveActor { private const string GossipTimerKey = "GossipTimer"; private const string PruneTimerKey = "PruneTimer"; @@ -134,7 +134,7 @@ public static Props Props(DistributedPubSubSettings settings) private readonly Dictionary> _bufferedMessages = new(); private readonly List _newlyAddedKeys = new(); - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } /// /// Transforms the local bucket registry dictionary into a dictionary of topic key and version number pairs @@ -176,6 +176,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _topicPrefix = Self.Path.ToStringWithoutAddress(); _cache = new PubSubCache(); + Timers = Context.Timers; + Receive(send => { var routees = new List(); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs index cf1cc3b1843..872816901f4 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs @@ -41,7 +41,7 @@ public override string ToString() /// /// Base class for both topics and groups. /// - internal abstract class TopicLike : ActorBase, IWithTimers + internal abstract class TopicLike : ActorBase { private const string PruneTimerKey = "PruneTimer"; @@ -87,9 +87,10 @@ protected TopicLike(TimeSpan emptyTimeToLive, bool sendToDeadLettersWhenNone) EmptyTimeToLive = emptyTimeToLive; SendToDeadLettersWhenNoSubscribers = sendToDeadLettersWhenNone; PruneInterval = new TimeSpan(emptyTimeToLive.Ticks / 2); + Timers = Context.Timers; } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } protected override void PreStart() { diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index faef720aedf..519173529f7 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -37,7 +37,7 @@ namespace Akka.Cluster.Tools.Singleton /// Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the actors involved. /// /// - public sealed class ClusterSingletonProxy : ReceiveActor, IWithTimers + public sealed class ClusterSingletonProxy : ReceiveActor { /// /// TBD @@ -142,6 +142,7 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS _memberAgeComparer = Member.AgeOrdering; _membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + Timers = Context.Timers; Receive(s => HandleInitial(s)); Receive(m => Add(m.Member)); @@ -226,7 +227,7 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS }); } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } private ILoggingAdapter Log => _log ??= Context.GetLogger(); diff --git a/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs b/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs index 8661538208b..7d3c6f8c3f7 100644 --- a/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs +++ b/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs @@ -36,7 +36,7 @@ namespace Akka.DistributedData.LightningDB /// to the durable store actor, which must then reply with the or /// to the . /// - public sealed class LmdbDurableStore : ReceiveActor, IWithTimers + public sealed class LmdbDurableStore : ReceiveActor { public static Actor.Props Props(Config config) => Actor.Props.Create(() => new LmdbDurableStore(config)); @@ -88,6 +88,7 @@ useWriteBehind is "off" or "false" or "no" ? _log.Warning("No directory path configured for LMDB durable store, using default path"); _path = DatabaseName; } + Timers = Context.Timers; Init(); } @@ -275,6 +276,6 @@ private void DoWriteBehind() } } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt index 0236d017068..92ae3da3cdc 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt @@ -256,13 +256,12 @@ namespace Akka.Cluster.Sharding } [Akka.Annotations.InternalStableApiAttribute()] [System.Runtime.CompilerServices.NullableAttribute(0)] - public sealed class ShardRegion : Akka.Actor.ActorBase, Akka.Actor.IWithTimers + public sealed class ShardRegion : Akka.Actor.ActorBase { public ShardRegion(string typeName, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { 2, 1, 1})] System.Func entityProps, Akka.Cluster.Sharding.ClusterShardingSettings settings, string coordinatorPath, Akka.Cluster.Sharding.IMessageExtractor messageExtractor, object handOffStopMessage, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Sharding.Internal.IRememberEntitiesProvider rememberEntitiesProvider) { } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PostStop() { } protected override void PreStart() { } protected override bool Receive(object message) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt index 38f5d852dcc..d975d2c5b97 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt @@ -256,13 +256,12 @@ namespace Akka.Cluster.Sharding } [Akka.Annotations.InternalStableApiAttribute()] [System.Runtime.CompilerServices.NullableAttribute(0)] - public sealed class ShardRegion : Akka.Actor.ActorBase, Akka.Actor.IWithTimers + public sealed class ShardRegion : Akka.Actor.ActorBase { public ShardRegion(string typeName, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { 2, 1, 1})] System.Func entityProps, Akka.Cluster.Sharding.ClusterShardingSettings settings, string coordinatorPath, Akka.Cluster.Sharding.IMessageExtractor messageExtractor, object handOffStopMessage, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Sharding.Internal.IRememberEntitiesProvider rememberEntitiesProvider) { } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PostStop() { } protected override void PreStart() { } protected override bool Receive(object message) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt index e87ea8710a2..cab1bee445d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt @@ -42,11 +42,10 @@ namespace Akka.Cluster.Tools.Client } } [System.Runtime.CompilerServices.NullableAttribute(0)] - public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public ClusterClientDiscovery(Akka.Cluster.Tools.Client.ClusterClientSettings settings) { } public Akka.Actor.IStash Stash { get; set; } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void OnReceive(object message) { } protected override void PostStop() { } protected override void PreStart() { } @@ -243,11 +242,10 @@ namespace Akka.Cluster.Tools.PublishSubscribe public DistributedPubSubExtensionProvider() { } public override Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } - public class DistributedPubSubMediator : Akka.Actor.ReceiveActor, Akka.Actor.IWithTimers + public class DistributedPubSubMediator : Akka.Actor.ReceiveActor { public DistributedPubSubMediator(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { } public System.Collections.Immutable.IImmutableDictionary OwnVersions { get; } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PostStop() { } protected override void PreStart() { } public static Akka.Actor.Props Props(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { } @@ -472,10 +470,9 @@ namespace Akka.Cluster.Tools.Singleton public ClusterSingletonProvider() { } public override Akka.Cluster.Tools.Singleton.ClusterSingleton CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } - public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor, Akka.Actor.IWithTimers + public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor { public ClusterSingletonProxy(string singletonManagerPath, Akka.Cluster.Tools.Singleton.ClusterSingletonProxySettings settings) { } - public Akka.Actor.ITimerScheduler Timers { get; set; } public static Akka.Configuration.Config DefaultConfig() { } protected override void PostStop() { } protected override void PreStart() { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt index d2660fe7688..b92fc979dfe 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt @@ -42,11 +42,10 @@ namespace Akka.Cluster.Tools.Client } } [System.Runtime.CompilerServices.NullableAttribute(0)] - public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public ClusterClientDiscovery(Akka.Cluster.Tools.Client.ClusterClientSettings settings) { } public Akka.Actor.IStash Stash { get; set; } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void OnReceive(object message) { } protected override void PostStop() { } protected override void PreStart() { } @@ -243,11 +242,10 @@ namespace Akka.Cluster.Tools.PublishSubscribe public DistributedPubSubExtensionProvider() { } public override Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } - public class DistributedPubSubMediator : Akka.Actor.ReceiveActor, Akka.Actor.IWithTimers + public class DistributedPubSubMediator : Akka.Actor.ReceiveActor { public DistributedPubSubMediator(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { } public System.Collections.Immutable.IImmutableDictionary OwnVersions { get; } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PostStop() { } protected override void PreStart() { } public static Akka.Actor.Props Props(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { } @@ -472,10 +470,9 @@ namespace Akka.Cluster.Tools.Singleton public ClusterSingletonProvider() { } public override Akka.Cluster.Tools.Singleton.ClusterSingleton CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } - public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor, Akka.Actor.IWithTimers + public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor { public ClusterSingletonProxy(string singletonManagerPath, Akka.Cluster.Tools.Singleton.ClusterSingletonProxySettings settings) { } - public Akka.Actor.ITimerScheduler Timers { get; set; } public static Akka.Configuration.Config DefaultConfig() { } protected override void PostStop() { } protected override void PreStart() { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 7d8f3de5a78..8824fc236a9 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -70,6 +70,7 @@ namespace Akka.Actor public object CurrentMessage { get; } public Akka.Dispatch.MessageDispatcher Dispatcher { get; } public bool HasMessages { get; } + public bool HaveTimers { get; } public bool IsLocal { get; } protected bool IsNormal { get; } public bool IsTerminated { get; } @@ -86,6 +87,7 @@ namespace Akka.Actor public Akka.Actor.ActorSystem System { get; } public Akka.Actor.Internal.ActorSystemImpl SystemImpl { get; } public virtual Akka.Dispatch.ActorTaskScheduler TaskScheduler { get; } + public Akka.Actor.ITimerScheduler Timers { get; } public virtual Akka.Actor.IActorRef ActorOf(Akka.Actor.Props props, [System.Runtime.CompilerServices.NullableAttribute(2)] string name = null) { } public Akka.Actor.ActorSelection ActorSelection(string path) { } public Akka.Actor.ActorSelection ActorSelection(Akka.Actor.ActorPath path) { } @@ -982,12 +984,14 @@ namespace Akka.Actor public interface IActorContext : Akka.Actor.IActorRefFactory, Akka.Actor.ICanWatch { Akka.Dispatch.MessageDispatcher Dispatcher { get; } + bool HaveTimers { get; } Akka.Actor.IActorRef Parent { get; } Akka.Actor.Props Props { get; } System.Nullable ReceiveTimeout { get; } Akka.Actor.IActorRef Self { get; } Akka.Actor.IActorRef Sender { get; } Akka.Actor.ActorSystem System { get; } + Akka.Actor.ITimerScheduler Timers { get; } void Become(Akka.Actor.Receive receive); void BecomeStacked(Akka.Actor.Receive receive); Akka.Actor.IActorRef Child(string name); @@ -1227,10 +1231,6 @@ namespace Akka.Actor [System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } - public interface IWithTimers - { - Akka.Actor.ITimerScheduler Timers { get; set; } - } public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { } public interface IWrappedMessage diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 02e6be3fb03..9b26f56c1b2 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -70,6 +70,7 @@ namespace Akka.Actor public object CurrentMessage { get; } public Akka.Dispatch.MessageDispatcher Dispatcher { get; } public bool HasMessages { get; } + public bool HaveTimers { get; } public bool IsLocal { get; } protected bool IsNormal { get; } public bool IsTerminated { get; } @@ -86,6 +87,7 @@ namespace Akka.Actor public Akka.Actor.ActorSystem System { get; } public Akka.Actor.Internal.ActorSystemImpl SystemImpl { get; } public virtual Akka.Dispatch.ActorTaskScheduler TaskScheduler { get; } + public Akka.Actor.ITimerScheduler Timers { get; } public virtual Akka.Actor.IActorRef ActorOf(Akka.Actor.Props props, [System.Runtime.CompilerServices.NullableAttribute(2)] string name = null) { } public Akka.Actor.ActorSelection ActorSelection(string path) { } public Akka.Actor.ActorSelection ActorSelection(Akka.Actor.ActorPath path) { } @@ -980,12 +982,14 @@ namespace Akka.Actor public interface IActorContext : Akka.Actor.IActorRefFactory, Akka.Actor.ICanWatch { Akka.Dispatch.MessageDispatcher Dispatcher { get; } + bool HaveTimers { get; } Akka.Actor.IActorRef Parent { get; } Akka.Actor.Props Props { get; } System.Nullable ReceiveTimeout { get; } Akka.Actor.IActorRef Self { get; } Akka.Actor.IActorRef Sender { get; } Akka.Actor.ActorSystem System { get; } + Akka.Actor.ITimerScheduler Timers { get; } void Become(Akka.Actor.Receive receive); void BecomeStacked(Akka.Actor.Receive receive); Akka.Actor.IActorRef Child(string name); @@ -1225,10 +1229,6 @@ namespace Akka.Actor [System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } - public interface IWithTimers - { - Akka.Actor.ITimerScheduler Timers { get; set; } - } public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { } public interface IWrappedMessage diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt index b6a437a6d2b..d56d0a2edd9 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt @@ -866,12 +866,11 @@ namespace Akka.Persistence.Journal protected static System.Exception TryUnwrapException(System.Exception e) { } protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken); } - public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected AsyncWriteProxy() { } public Akka.Actor.IStash Stash { get; set; } public abstract System.TimeSpan Timeout { get; } - public Akka.Actor.ITimerScheduler Timers { get; set; } public override void AroundPreStart() { } protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken) { } @@ -1064,11 +1063,10 @@ namespace Akka.Persistence.Journal public Akka.Actor.IActorRef ReplyTo { get; } } } - public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public PersistencePluginProxy(Akka.Configuration.Config config) { } public Akka.Actor.IStash Stash { get; set; } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PreStart() { } protected override bool Receive(object message) { } public static void SetTargetLocation(Akka.Actor.ActorSystem system, Akka.Actor.Address address) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt index 5b8a811b2bb..a136e19d3d9 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt @@ -866,12 +866,11 @@ namespace Akka.Persistence.Journal protected static System.Exception TryUnwrapException(System.Exception e) { } protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken); } - public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected AsyncWriteProxy() { } public Akka.Actor.IStash Stash { get; set; } public abstract System.TimeSpan Timeout { get; } - public Akka.Actor.ITimerScheduler Timers { get; set; } public override void AroundPreStart() { } protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken) { } @@ -1064,11 +1063,10 @@ namespace Akka.Persistence.Journal public Akka.Actor.IActorRef ReplyTo { get; } } } - public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public PersistencePluginProxy(Akka.Configuration.Config config) { } public Akka.Actor.IStash Stash { get; set; } - public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PreStart() { } protected override bool Receive(object message) { } public static void SetTargetLocation(Akka.Actor.ActorSystem system, Akka.Actor.Address address) { } diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index 04179e1d402..c634de5c68e 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -66,7 +66,7 @@ public override void Down(UniqueAddress node, IDecision decision) /// The implementation is split into two classes SplitBrainResolver and SplitBrainResolverBase to be /// able to unit test the logic without running cluster. /// - internal abstract class SplitBrainResolverBase : ActorBase, IWithUnboundedStash, IWithTimers + internal abstract class SplitBrainResolverBase : ActorBase, IWithUnboundedStash { // would be better as constructor parameter, but don't want to break Cinnamon instrumentation private readonly SplitBrainResolverSettings _settings; @@ -87,7 +87,7 @@ protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) _settings = new SplitBrainResolverSettings(Context.System.Settings.Config); - // ReSharper disable once VirtualMemberCallInConstructor + Timers = Context.Timers; Timers.StartPeriodicTimer(Tick.Instance, Tick.Instance, TickInterval); ResetStableDeadline(); @@ -111,7 +111,7 @@ protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) public bool IsResponsible => Leader && _selfMemberAdded; - public ITimerScheduler Timers { get; set; } + protected ITimerScheduler Timers { get; } public IStash Stash { get; set; } diff --git a/src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs b/src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs index 6d00c26415e..0e3154a50ea 100644 --- a/src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs +++ b/src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs @@ -24,15 +24,17 @@ public sealed class CanPrint{} public sealed class Print { } public sealed class Total { } - public sealed class TimerActor : ReceiveActor, IWithTimers + public sealed class TimerActor : ReceiveActor { - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } private int _count = 0; private readonly ILoggingAdapter _log = Context.GetLogger(); public TimerActor() { + Timers = Context.Timers; + Receive(i => { _count += i; @@ -52,15 +54,17 @@ protected override void PreStart() } // - public sealed class StartStopTimerActor : ReceiveActor, IWithTimers + public sealed class StartStopTimerActor : ReceiveActor { - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } private int _count = 0; private ILoggingAdapter _log = Context.GetLogger(); public StartStopTimerActor() { + Timers = Context.Timers; + Receive(i => { _count += i; diff --git a/src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs b/src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs index 76bee8483ff..a2053c3e36f 100644 --- a/src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs +++ b/src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs @@ -199,7 +199,7 @@ protected override void OnReceive(object message) // // - public sealed class ProducerActor : UntypedActor, IWithTimers, IWithStash + public sealed class ProducerActor : UntypedActor, IWithStash { private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -217,6 +217,11 @@ private Tick() public static Tick Instance { get; } = new(); } + public ProducerActor() + { + Timers = Context.Timers; + } + protected override void PreStart() { Timers.StartPeriodicTimer("Tick", Tick.Instance, TimeSpan.FromSeconds(0.25)); @@ -272,7 +277,7 @@ private Receive Active(IActorRef sendNextTo) } private string GetRandomItem() => Items[ThreadLocalRandom.Current.Next(Items.Count)]; - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } public IStash Stash { get; set; } } // diff --git a/src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs b/src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs index 718bfb75461..9a31111f8fc 100644 --- a/src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs +++ b/src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs @@ -135,7 +135,7 @@ public void Actor_with_lease_should_release_lease_when_stopped() } -public class LeaseActor: ReceiveActor, IWithStash, IWithTimers +public class LeaseActor: ReceiveActor, IWithStash { #region messages private sealed record LeaseAcquireResult(bool Acquired, Exception? Reason); @@ -158,6 +158,8 @@ private LeaseRetryTick() { } #region constructor public LeaseActor(LeaseUsageSettings leaseSettings, string resourceId, string actorUniqueId) { + Timers = Context.Timers; + _resourceId = resourceId; _uniqueId = actorUniqueId; @@ -173,7 +175,7 @@ public LeaseActor(LeaseUsageSettings leaseSettings, string resourceId, string ac public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } #region actor-states private void AcquiringLease() diff --git a/src/core/Akka.Persistence.Tests/TimerPersistentActorSpec.cs b/src/core/Akka.Persistence.Tests/TimerPersistentActorSpec.cs index a59c368c3f5..775a3ea1d85 100644 --- a/src/core/Akka.Persistence.Tests/TimerPersistentActorSpec.cs +++ b/src/core/Akka.Persistence.Tests/TimerPersistentActorSpec.cs @@ -69,7 +69,7 @@ public AutoReceivedMessageWrapper(IAutoReceivedMessage msg) } } - internal class TestPersistentActor : PersistentActor, IWithTimers + internal class TestPersistentActor : PersistentActor { public static Props TestProps(string name) { @@ -80,10 +80,11 @@ public static Props TestProps(string name) public override string PersistenceId => name; - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } public TestPersistentActor(string name) { + Timers = Context.Timers; this.name = name; } diff --git a/src/core/Akka.Persistence/Delivery/EventSourcedProducerQueue.cs b/src/core/Akka.Persistence/Delivery/EventSourcedProducerQueue.cs index a5b8c39488e..3c9064af6a2 100644 --- a/src/core/Akka.Persistence/Delivery/EventSourcedProducerQueue.cs +++ b/src/core/Akka.Persistence/Delivery/EventSourcedProducerQueue.cs @@ -134,7 +134,7 @@ public static Props Create(string persistentId, IActorRefFactory system) /// INTERNAL API /// /// The types of messages that can be handled by the . -internal sealed class EventSourcedProducerQueue : UntypedPersistentActor, IWithTimers, IWithStash +internal sealed class EventSourcedProducerQueue : UntypedPersistentActor, IWithStash { public EventSourcedProducerQueue(string persistenceId, EventSourcedProducerQueue.Settings? settings = null, ITimeProvider? timeProvider = null) @@ -145,6 +145,8 @@ public EventSourcedProducerQueue(string persistenceId, EventSourcedProducerQueue JournalPluginId = Settings.JournalPluginId; SnapshotPluginId = Settings.SnapshotPluginId; Self.Tell(EventSourcedProducerQueue.CleanupTick.Instance); + + Timers = Context.Timers; Timers.StartPeriodicTimer(EventSourcedProducerQueue.CleanupTick.Instance, EventSourcedProducerQueue.CleanupTick.Instance, TimeSpan.FromMilliseconds(Settings.CleanupUnusedAfter.TotalMilliseconds / 2)); @@ -153,7 +155,7 @@ public EventSourcedProducerQueue(string persistenceId, EventSourcedProducerQueue public EventSourcedProducerQueue.Settings Settings { get; } public override string PersistenceId { get; } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } private readonly ITimeProvider _timeProvider; private readonly ILoggingAdapter _log = Context.GetLogger(); diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs b/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs index 45e63b827b2..3fd0d68b9d4 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs @@ -252,7 +252,7 @@ public bool Equals(DeleteMessagesTo other) /// /// A journal that delegates actual storage to a target actor. For testing only. /// - public abstract class AsyncWriteProxy : AsyncWriteJournal, IWithUnboundedStash, IWithTimers + public abstract class AsyncWriteProxy : AsyncWriteJournal, IWithUnboundedStash { private const string InitTimeoutTimerKey = nameof(InitTimeoutTimerKey); @@ -265,6 +265,8 @@ protected AsyncWriteProxy() _isInitialized = false; _isInitTimedOut = false; _store = null; + + Timers = Context.Timers; } /// @@ -419,7 +421,7 @@ private static Task StoreNotInitialized() /// public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } // sent to self only /// diff --git a/src/core/Akka.Persistence/Journal/PersistencePluginProxy.cs b/src/core/Akka.Persistence/Journal/PersistencePluginProxy.cs index 1df9825cc4d..b1bb7efad51 100644 --- a/src/core/Akka.Persistence/Journal/PersistencePluginProxy.cs +++ b/src/core/Akka.Persistence/Journal/PersistencePluginProxy.cs @@ -18,7 +18,7 @@ namespace Akka.Persistence.Journal /// /// TBD /// - public class PersistencePluginProxy : ActorBase, IWithUnboundedStash, IWithTimers + public class PersistencePluginProxy : ActorBase, IWithUnboundedStash { private const string InitTimeoutTimerKey = nameof(InitTimeoutTimerKey); @@ -106,6 +106,8 @@ private class SnapshotStore : IPluginType /// public PersistencePluginProxy(Config config) { + Timers = Context.Timers; + _config = config; var pluginId = Self.Path.Name; if (pluginId.Equals("akka.persistence.journal.proxy")) @@ -129,7 +131,7 @@ public PersistencePluginProxy(Config config) /// public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } /// /// TBD diff --git a/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs b/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs index f2b6e10441b..b061dadb48a 100644 --- a/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs +++ b/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs @@ -64,7 +64,7 @@ private ResendFinal() { } public static ResendFinal Instance { get; } = new(); } - private class SequenceVerifier : UntypedActor, IWithTimers + private class SequenceVerifier : UntypedActor { private const string SendNextTimerKey = nameof(SendNextTimerKey); private const string SendFinalTimerKey = nameof(SendFinalTimerKey); @@ -81,9 +81,10 @@ public SequenceVerifier(IActorRef remote, IActorRef controller) { _remote = remote; _controller = controller; + Timers = Context.Timers; } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } protected override void OnReceive(object message) { diff --git a/src/core/Akka.Tests/Actor/ActorLifeCycleFlowSpec.cs b/src/core/Akka.Tests/Actor/ActorLifeCycleFlowSpec.cs index 050eb2c9196..58b6fc150a5 100644 --- a/src/core/Akka.Tests/Actor/ActorLifeCycleFlowSpec.cs +++ b/src/core/Akka.Tests/Actor/ActorLifeCycleFlowSpec.cs @@ -83,7 +83,7 @@ public override string ToString() } } - private class LifeCycleActor : UntypedActor, IWithTimers + private class LifeCycleActor : UntypedActor { private readonly int _id; private readonly IActorRef _probe; @@ -96,9 +96,10 @@ public LifeCycleActor(IActorRef probe, int id, ImmutableArray st _probe = probe; _id = id; _startTimers = startTimers; + Timers = Context.Timers; } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } protected override void OnReceive(object message) { diff --git a/src/core/Akka.Tests/Actor/Scheduler/TimerSchedulerDebugSpec.cs b/src/core/Akka.Tests/Actor/Scheduler/TimerSchedulerDebugSpec.cs index 69f728708d8..4d3153031fe 100644 --- a/src/core/Akka.Tests/Actor/Scheduler/TimerSchedulerDebugSpec.cs +++ b/src/core/Akka.Tests/Actor/Scheduler/TimerSchedulerDebugSpec.cs @@ -19,14 +19,14 @@ namespace Akka.Tests.Actor.Scheduler; -internal sealed class TimerTestActor: UntypedActor, IWithTimers +internal sealed class TimerTestActor: UntypedActor { protected override void OnReceive(object message) { switch (message) { case "startTimer": - Timers.StartSingleTimer("test", "test", 1.Seconds()); + Context.Timers.StartSingleTimer("test", "test", 1.Seconds()); break; case "test": break; @@ -35,8 +35,6 @@ protected override void OnReceive(object message) break; } } - - public ITimerScheduler Timers { get; set; } } public class TimerSchedulerDebug: TestKit.Xunit2.TestKit diff --git a/src/core/Akka.Tests/Actor/TimerSpec.cs b/src/core/Akka.Tests/Actor/TimerSpec.cs index 039fcfab17a..7b95b2cf8dc 100644 --- a/src/core/Akka.Tests/Actor/TimerSpec.cs +++ b/src/core/Akka.Tests/Actor/TimerSpec.cs @@ -356,13 +356,13 @@ public Exc() } } - internal class Target : ActorBase, IWithTimers + internal class Target : ActorBase { private IActorRef monitor; private TimeSpan interval; int bumpCount; - public ITimerScheduler Timers { get ; set ; } + private ITimerScheduler Timers { get; } public Target(IActorRef monitor, TimeSpan interval, bool repeat, Func initial) { @@ -370,6 +370,7 @@ public Target(IActorRef monitor, TimeSpan interval, bool repeat, Func initi this.interval = interval; bumpCount = initial(); + Timers = Context.Timers; if (repeat) Timers.StartPeriodicTimer("T", new Tick(bumpCount), interval); else @@ -560,17 +561,18 @@ private StopStashing() } } - internal class ActorWithTimerAndStash : ActorBase, IWithUnboundedStash, IWithTimers + internal class ActorWithTimerAndStash : ActorBase, IWithUnboundedStash { private IActorRef probe; public IStash Stash { get; set; } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } public ActorWithTimerAndStash(IActorRef probe) { this.probe = probe; + Timers = Context.Timers; Timers.StartSingleTimer("key", "scheduled", TimeSpan.FromMilliseconds(50)); Context.Become(Stashing); } diff --git a/src/core/Akka.Tests/Actor/TimerStartupCrashBugFixSpec.cs b/src/core/Akka.Tests/Actor/TimerStartupCrashBugFixSpec.cs index 865a12c77a5..cf0fa7b8421 100644 --- a/src/core/Akka.Tests/Actor/TimerStartupCrashBugFixSpec.cs +++ b/src/core/Akka.Tests/Actor/TimerStartupCrashBugFixSpec.cs @@ -27,7 +27,7 @@ public TimerStartupCrashBugFixSpec(ITestOutputHelper output) : base(output: outp Sys.Log.Info("Starting TimerStartupCrashBugFixSpec"); } - private class TimerActor : UntypedActor, IWithTimers + private class TimerActor : UntypedActor { public sealed class Check { @@ -49,8 +49,13 @@ private Hit() private readonly ILoggingAdapter _log = Context.GetLogger(); private int _counter = 0; - public ITimerScheduler? Timers { get; set; } = null; + private ITimerScheduler Timers { get; } + public TimerActor() + { + Timers = Context.Timers; + } + protected override void PreStart() { Timers?.StartPeriodicTimer("key", Hit.Instance, TimeSpan.FromMilliseconds(1)); diff --git a/src/core/Akka.Tests/Delivery/TestConsumer.cs b/src/core/Akka.Tests/Delivery/TestConsumer.cs index 02eee48f6c4..7d19e29178f 100644 --- a/src/core/Akka.Tests/Delivery/TestConsumer.cs +++ b/src/core/Akka.Tests/Delivery/TestConsumer.cs @@ -20,7 +20,7 @@ namespace Akka.Tests.Delivery; /// /// INTERNAL API /// -public sealed class TestConsumer : ReceiveActor, IWithTimers +public sealed class TestConsumer : ReceiveActor { public static readonly TimeSpan DefaultConsumerDelay = TimeSpan.FromMilliseconds(10); @@ -44,6 +44,7 @@ public TestConsumer(TimeSpan delay, Func endCondition, IActo EndCondition = endCondition; EndReplyTo = endReplyTo; ConsumerController = consumerController; + Timers = Context.Timers; _supportRestarts = supportRestarts; Active(); @@ -195,7 +196,7 @@ public static Props PropsFor(TimeSpan delay, Func endConditi IActorRef consumerController, bool supportsRestarts = false) => Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts)); - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } } /// diff --git a/src/core/Akka.Tests/Delivery/TestProducer.cs b/src/core/Akka.Tests/Delivery/TestProducer.cs index 517643b722f..bb55b0b82ed 100644 --- a/src/core/Akka.Tests/Delivery/TestProducer.cs +++ b/src/core/Akka.Tests/Delivery/TestProducer.cs @@ -15,7 +15,7 @@ namespace Akka.Tests.Delivery; /// /// INTERNAL API. /// -public sealed class TestProducer : ReceiveActor, IWithTimers +public sealed class TestProducer : ReceiveActor { public static readonly TimeSpan DefaultProducerDelay = TimeSpan.FromMilliseconds(20); @@ -30,7 +30,7 @@ private Tick() public int CurrentSequenceNr { get; private set; } public TimeSpan Delay { get; } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } private readonly IActorRef _producerController; private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -38,6 +38,7 @@ public TestProducer(TimeSpan delay, IActorRef producerController) { Delay = delay; _producerController = producerController; + Timers = Context.Timers; if (Delay == TimeSpan.Zero) ActiveNoDelay(); else diff --git a/src/core/Akka.Tests/Delivery/TestProducerWithAsk.cs b/src/core/Akka.Tests/Delivery/TestProducerWithAsk.cs index cce4bc5a3e5..2c354288a83 100644 --- a/src/core/Akka.Tests/Delivery/TestProducerWithAsk.cs +++ b/src/core/Akka.Tests/Delivery/TestProducerWithAsk.cs @@ -13,7 +13,7 @@ namespace Akka.Tests.Delivery; -public sealed class TestProducerWithAsk : ReceiveActor, IWithTimers +public sealed class TestProducerWithAsk : ReceiveActor { public static readonly TimeSpan DefaultAskTimeout = TimeSpan.FromSeconds(10); @@ -57,6 +57,7 @@ public TestProducerWithAsk(TimeSpan delay, IActorRef replyProbe, IActorRef produ _replyProbe = replyProbe; _producerController = producerController; + Timers = Context.Timers; Timers.StartPeriodicTimer(Tick.Instance, Tick.Instance, Delay); Idle(); } @@ -114,5 +115,5 @@ protected override void PreStart() _producerController.Tell(new ProducerController.Start(Self)); } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } } diff --git a/src/core/Akka/Actor/ActorBase.Lifecycle.cs b/src/core/Akka/Actor/ActorBase.Lifecycle.cs index 1a028da7b52..2edb26da548 100644 --- a/src/core/Akka/Actor/ActorBase.Lifecycle.cs +++ b/src/core/Akka/Actor/ActorBase.Lifecycle.cs @@ -20,8 +20,8 @@ public abstract partial class ActorBase /// The message. public virtual void AroundPreRestart(Exception cause, object message) { - if (this is IWithTimers withTimers) - withTimers.Timers?.CancelAll(); + if(Context.HaveTimers) + Context.Timers.CancelAll(); PreRestart(cause, message); } @@ -88,8 +88,8 @@ protected virtual void PostRestart(Exception reason) /// public virtual void AroundPostStop() { - if (this is IWithTimers withTimers) - withTimers.Timers?.CancelAll(); + if(Context.HaveTimers) + Context.Timers.CancelAll(); PostStop(); } diff --git a/src/core/Akka/Actor/ActorBase.cs b/src/core/Akka/Actor/ActorBase.cs index 550c8256ff2..d20d0ce98cf 100644 --- a/src/core/Akka/Actor/ActorBase.cs +++ b/src/core/Akka/Actor/ActorBase.cs @@ -124,9 +124,9 @@ protected ActorBase() if (ActorCell.Current == null) throw new ActorInitializationException("Do not create actors using 'new', always create them using an ActorContext/System"); - if (this is IWithTimers withTimers) - withTimers.Timers = new Scheduler.TimerScheduler(Context); - + if(Context.HaveTimers) + Context.Timers.CancelAll(); + Context.Become(Receive); } @@ -181,8 +181,9 @@ protected internal virtual bool AroundReceive(Receive receive, object message) { if (message is TimerScheduler.ITimerMsg tm) { - if (this is IWithTimers { Timers: TimerScheduler timers }) + if (Context.HaveTimers) { + var timers = (TimerScheduler)Context.Timers; switch (timers.InterceptTimerMsg(Context.System.Log, tm)) { case IAutoReceivedMessage m: diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 4677cb9aba3..90e4b5bcc61 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -16,6 +16,7 @@ using System.Reflection; using System.Runtime.Serialization; using System.Threading.Tasks; +using Akka.Actor.Scheduler; using Akka.Serialization; using Akka.Util; using Assert = System.Diagnostics.Debug; @@ -153,7 +154,19 @@ internal static ActorCell? Current /// public IActorRef Self { get { return _self; } } IActorRef IActorContext.Parent { get { return Parent; } } - + + private ITimerScheduler? _timers; + + public ITimerScheduler Timers + { + get + { + return _timers ??= new TimerScheduler(this); + } + } + + public bool HaveTimers => _timers is not null; + /// /// This actor's parent actor. /// diff --git a/src/core/Akka/Actor/IActorContext.cs b/src/core/Akka/Actor/IActorContext.cs index 31b26a221f4..a2f5990380f 100644 --- a/src/core/Akka/Actor/IActorContext.cs +++ b/src/core/Akka/Actor/IActorContext.cs @@ -89,6 +89,13 @@ public interface IActorContext : IActorRefFactory, ICanWatch /// IActorRef Parent { get; } + /// + /// Get a timer scheduler for this actor + /// + ITimerScheduler Timers { get; } + + bool HaveTimers { get; } + /// /// Changes the actor's behavior and replaces the current receive handler with the specified handler. /// diff --git a/src/core/Akka/Actor/Scheduler/IWithTimers.cs b/src/core/Akka/Actor/Scheduler/IWithTimers.cs deleted file mode 100644 index bfc00bacf44..00000000000 --- a/src/core/Akka/Actor/Scheduler/IWithTimers.cs +++ /dev/null @@ -1,21 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2022 Lightbend Inc. -// Copyright (C) 2013-2025 .NET Foundation -// -//----------------------------------------------------------------------- - -namespace Akka.Actor -{ - /// - /// Marker interface for adding Timers support - /// - public interface IWithTimers - { - /// - /// Gets or sets the TimerScheduler. This will be automatically populated by the framework in base constructor. - /// Implement this as an auto property. - /// - ITimerScheduler Timers { get; set; } - } -} diff --git a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs index 45a35c1d26e..4d4efc9147c 100644 --- a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs @@ -29,7 +29,7 @@ namespace Akka.Delivery.Internal; /// The types of messages handled by the and /// . /// -internal sealed class ConsumerController : ReceiveActor, IWithTimers, IWithStash +internal sealed class ConsumerController : ReceiveActor, IWithStash { /// /// Used only for testing to simulate network failures. @@ -48,6 +48,7 @@ public ConsumerController(Option producerControllerRegistration, Cons _producerControllerRegistration = producerControllerRegistration; Settings = settings; _fuzzingControl = fuzzingControl; + Timers = Context.Timers; _retryTimer = new RetryTimer(Settings.ResendIntervalMin, Settings.ResendIntervalMax, Timers); WaitForStart(); @@ -58,7 +59,7 @@ public ConsumerController(Option producerControllerRegistration, Cons public bool ResendLost => !Settings.OnlyFlowControl; public IStash Stash { get; set; } = null!; - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } protected internal override bool AroundReceive(Receive receive, object message) { diff --git a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs index 1ae0b9370a7..13546481ea1 100644 --- a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs @@ -26,7 +26,7 @@ namespace Akka.Delivery.Internal; /// INTERNAL API /// /// The type of message handled by this producer -internal sealed class ProducerController : ReceiveActor, IWithTimers +internal sealed class ProducerController : ReceiveActor { /// /// Default send function for when none are specified. @@ -57,6 +57,7 @@ public ProducerController(string producerId, { ProducerId = producerId; Settings = settings ?? ProducerController.Settings.Create(Context.System); + Timers = Context.Timers; _durableProducerQueueProps = durableProducerQueue; _timeProvider = timeProvider ?? Context.System.Scheduler; _fuzzingControl = fuzzingControl; @@ -86,6 +87,7 @@ public ProducerController(string producerId, { ProducerId = producerId; Settings = settings ?? ProducerController.Settings.Create(Context.System); + Timers = Context.Timers; _durableProducerQueueProps = durableProducerQueue; _timeProvider = timeProvider ?? Context.System.Scheduler; _fuzzingControl = fuzzingControl; @@ -106,7 +108,7 @@ public ProducerController(string producerId, public ProducerController.Settings Settings { get; } - public ITimerScheduler Timers { get; set; } = null!; + private ITimerScheduler Timers { get; } protected internal override bool AroundReceive(Receive receive, object message) { diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 01dd20f3486..8a783dde8da 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -39,7 +39,7 @@ public SocketAsyncActorEventArgs(IActorRef notifyMe, EventHandler - internal sealed class TcpListener : ActorBase, IRequiresMessageQueue, IWithTimers + internal sealed class TcpListener : ActorBase, IRequiresMessageQueue { private readonly TcpExt _tcp; private readonly IActorRef _bindCommander; // forwarded destination for Connected @@ -95,6 +95,7 @@ private sealed record RetryAccept(SocketAsyncEventArgs EventArgs) : INoSerializa public TcpListener(TcpExt tcp, IActorRef bindCommander, Tcp.Bind bind) { + Timers = Context.Timers; _tcp = tcp; _acceptLimit = tcp.Settings.BatchAcceptLimit; @@ -406,6 +407,6 @@ protected override void PostStop() } } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } } } \ No newline at end of file diff --git a/src/examples/Cluster/ClusterSharding/ShoppingCart/Producer.cs b/src/examples/Cluster/ClusterSharding/ShoppingCart/Producer.cs index 9bff9e9c41c..b16df2d9526 100644 --- a/src/examples/Cluster/ClusterSharding/ShoppingCart/Producer.cs +++ b/src/examples/Cluster/ClusterSharding/ShoppingCart/Producer.cs @@ -19,7 +19,7 @@ namespace ShoppingCart; /// /// Actor is responsible for producing messages /// -public sealed class Producer : ReceiveActor, IWithTimers +public sealed class Producer : ReceiveActor { private static readonly string[] Customers = new[] { @@ -39,12 +39,13 @@ private sealed class Produce private Produce() {} } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } public IActorRef SendNext { get; set; } = ActorRefs.Nobody; public Producer() { + Timers = Context.Timers; Idle(); } diff --git a/src/examples/Cluster/ClusterTools/ClusterToolsExample.Shared/WorkerManager.cs b/src/examples/Cluster/ClusterTools/ClusterToolsExample.Shared/WorkerManager.cs index e92becfab64..d79eb8ece45 100644 --- a/src/examples/Cluster/ClusterTools/ClusterToolsExample.Shared/WorkerManager.cs +++ b/src/examples/Cluster/ClusterTools/ClusterToolsExample.Shared/WorkerManager.cs @@ -13,7 +13,7 @@ namespace ClusterToolsExample.Shared; -public class WorkerManager : ReceiveActor, IWithTimers +public class WorkerManager : ReceiveActor { private const string BatchKey = nameof(BatchKey); private const string ReportKey = nameof(ReportKey); @@ -25,6 +25,7 @@ public WorkerManager() var log = Context.GetLogger(); var counter = Context.ActorOf(name: "workload-counter"); var workerRouter = GetWorkerRouter(counter); + Timers = Context.Timers; Receive(batch => { @@ -56,7 +57,7 @@ public WorkerManager() Timers.StartPeriodicTimer(ReportKey, SendReport.Instance, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10)); } - public ITimerScheduler Timers { get; set; } + private ITimerScheduler Timers { get; } private IActorRef GetWorkerRouter(IActorRef counter) {