Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/benchmark/Akka.Benchmarks/IO/TcpOperationsBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public EchoConnection(IActorRef connection, int messageSize)
}
}

private class ClientCoordinator : ReceiveActor, IWithTimers, IWithStash
private class ClientCoordinator : ReceiveActor, IWithStash
{
private readonly IActorRef _echoServer;
private readonly HashSet<IActorRef> _waitingChildren = new();
Expand All @@ -211,6 +211,7 @@ public ClientCoordinator(IActorRef echoServer, int clientsCount)
{
_echoServer = echoServer;
_clientsCount = clientsCount;
Timers = Context.Timers;

Receive<EndPoint>(endpoint =>
{
Expand Down Expand Up @@ -281,7 +282,7 @@ protected override void PreStart()
_echoServer.Tell(GetBindAddress.Instance);
}

public ITimerScheduler Timers { get; set; }
private ITimerScheduler Timers { get; }
public IStash Stash { get; set; }
}

Expand Down Expand Up @@ -322,7 +323,7 @@ public IEnumerable<ByteString> Deframe(ByteString data)
}
}

private class Client : ReceiveActor, IWithTimers
private class Client : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private int _receivedCount = 0;
Expand All @@ -344,6 +345,8 @@ private RetryConnect()
public Client(EndPoint endpoint, int messagesToSend, byte[] message)
{
_framer = new Framer(message.Length);
Timers = Context.Timers;

var write =
// create the write only once
Tcp.Write.Create(ByteString.FromBytes(message));
Expand Down Expand Up @@ -409,7 +412,7 @@ private static void DoConnect(EndPoint endpoint)
Context.System.Tcp().Tell(new Tcp.Connect(endpoint, timeout: TimeSpan.FromSeconds(5)));
}

public ITimerScheduler Timers { get; set; }
private ITimerScheduler Timers { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SetStore(IActorRef store) =>
/// <summary>
/// A journal that delegates actual storage to a target actor. For testing only.
/// </summary>
public abstract class AsyncWriteProxyEx : AsyncWriteJournal, IWithUnboundedStash, IWithTimers
public abstract class AsyncWriteProxyEx : AsyncWriteJournal, IWithUnboundedStash
{
private const string InitTimeoutTimerKey = nameof(InitTimeoutTimerKey);

Expand All @@ -99,6 +99,7 @@ protected AsyncWriteProxyEx()
_isInitialized = false;
_isInitTimedOut = false;
_store = null;
Timers = Context.Timers;
}

/// <summary>
Expand Down Expand Up @@ -257,7 +258,7 @@ private Task<T> StoreNotInitialized<T>()
/// </summary>
public IStash Stash { get; set; }

public ITimerScheduler Timers { get; set; }
private ITimerScheduler Timers { get; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ protected override bool Receive(object message)
}

// slow stop previously made it more likely that the coordinator would stop before the local region
public class SlowStopShardedEntity : ActorBase, IWithTimers
public class SlowStopShardedEntity : ActorBase
{
#region StopMessage
public class Stop
Expand All @@ -143,8 +143,6 @@ private ActualStop()
}
}

public ITimerScheduler Timers { get; set; }

#region DelayedStop
protected override bool Receive(object message)
{
Expand All @@ -154,7 +152,7 @@ protected override bool Receive(object message)
Sender.Tell(id);
return true;
case Stop _:
Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50));
Context.Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50));
return true;
case ActualStop _:
Context.Stop(Self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery;
/// <summary>
/// INTERNAL API
/// </summary>
public sealed class SlowStopConsumerEntity : ReceiveActor, IWithTimers
public sealed class SlowStopConsumerEntity : ReceiveActor
{
private readonly IActorRef _consumerController;

public SlowStopConsumerEntity(string persistenceId, IActorRef consumerController)
{
_consumerController = consumerController;
Timers = Context.Timers;

Receive<ConsumerController.Delivery<Job>>(delivery =>
{
Expand Down Expand Up @@ -57,5 +58,5 @@ private ActualStop() { }

public sealed record Job(int Payload, IActorRef Probe);

public ITimerScheduler Timers { get; set; } = null!;
private ITimerScheduler Timers { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery;

internal class TestShardingProducer : ReceiveActor, IWithTimers, IWithStash
internal class TestShardingProducer : ReceiveActor, IWithStash
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IActorRef _producerController;
Expand All @@ -22,10 +22,11 @@ public TestShardingProducer(IActorRef producerController, IActorRef probe)
{
_producerController = producerController;
_probe = probe;
Timers = Context.Timers;
Idle();
}

public ITimerScheduler Timers { get; set; } = null!;
private ITimerScheduler Timers { get; }
public IStash Stash { get; set; } = null!;
public IActorRef SendNext { get; set; } = ActorRefs.Nobody;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace Akka.Cluster.Sharding.Tests
{
public abstract class SnapshotStoreProxy : SnapshotStore, IWithUnboundedStash, IWithTimers
public abstract class SnapshotStoreProxy : SnapshotStore, IWithUnboundedStash
{
private const string TimeoutTimerKey = nameof(TimeoutTimerKey);

Expand All @@ -39,6 +39,7 @@ protected SnapshotStoreProxy()
_isInitialized = false;
_isInitTimedOut = false;
_store = null;
Timers = Context.Timers;
}

/// <summary>
Expand All @@ -51,7 +52,7 @@ protected SnapshotStoreProxy()
/// </summary>
public IStash Stash { get; set; }

public ITimerScheduler Timers { get; set; }
private ITimerScheduler Timers { get; }

/// <summary>
/// TBD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ protected override void PreStart()
}
}

private class TestShardingProducer : ReceiveActor, IWithTimers
private class TestShardingProducer : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IActorRef _producerController;
Expand All @@ -553,8 +553,6 @@ public TestShardingProducer(IActorRef producerController)
Idle(0);
}

public ITimerScheduler Timers { get; set; } = null!;

protected override void PreStart()
{
var self = Self;
Expand All @@ -570,7 +568,7 @@ protected override void PreStart()
}, "sendNextAdapter");

// simulate fast producer
Timers.StartPeriodicTimer("tick", Tick.Instance, TimeSpan.FromMilliseconds(20));
Context.Timers.StartPeriodicTimer("tick", Tick.Instance, TimeSpan.FromMilliseconds(20));
}

private void Idle(int n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public Props CoordinatorStoreProps()
}
}

private class FakeShardStoreActor : ActorBase, IWithTimers
private class FakeShardStoreActor : ActorBase
{
public static Props Props(string shardId) => Actor.Props.Create(() => new FakeShardStoreActor(shardId));

Expand Down Expand Up @@ -202,10 +202,11 @@ public Delayed(IActorRef replyTo, object msg)
public FakeShardStoreActor(string shardId)
{
this.shardId = shardId;
Timers = Context.Timers;
Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId));
}

public ITimerScheduler Timers { get; set; }
private ITimerScheduler Timers { get; }

//protected override void PreStart()
//{
Expand Down Expand Up @@ -277,7 +278,7 @@ protected override bool Receive(object message)
}
}

private class FakeCoordinatorStoreActor : ActorBase, IWithTimers
private class FakeCoordinatorStoreActor : ActorBase
{
public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor());

Expand Down Expand Up @@ -306,10 +307,11 @@ public ClearFailShard(string shardId)
private readonly ILoggingAdapter log = Context.GetLogger();
private ImmutableDictionary<string, IFail> failAddShard = ImmutableDictionary<string, IFail>.Empty;

public ITimerScheduler Timers { get; set; }
private ITimerScheduler Timers { get; }

public FakeCoordinatorStoreActor()
{
Timers = Context.Timers;
Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public Props CoordinatorStoreProps()
}
}

private class FakeShardStoreActor : ActorBase, IWithTimers
private class FakeShardStoreActor : ActorBase
{
public static Props Props(string shardId) => Actor.Props.Create(() => new FakeShardStoreActor(shardId));

Expand All @@ -185,8 +185,6 @@ public FakeShardStoreActor(string shardId)
Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId));
}

public ITimerScheduler Timers { get; set; }

protected override bool Receive(object message)
{
switch (message)
Expand All @@ -202,14 +200,12 @@ protected override bool Receive(object message)
}
}

private class FakeCoordinatorStoreActor : ActorBase, IWithTimers
private class FakeCoordinatorStoreActor : ActorBase
{
public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor());

private readonly ILoggingAdapter _log = Context.GetLogger();

public ITimerScheduler Timers { get; set; }

public FakeCoordinatorStoreActor()
{
Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Akka.Cluster.Sharding
/// <summary>
/// Singleton coordinator (with state based on ddata) that decides where to allocate shards.
/// </summary>
internal sealed class DDataShardCoordinator : ActorBase, IWithTimers, IWithUnboundedStash
internal sealed class DDataShardCoordinator : ActorBase, IWithUnboundedStash
{

private sealed class RememberEntitiesStoreStopped
Expand Down Expand Up @@ -90,7 +90,7 @@ internal static Props Props(
private readonly IActorRef? _rememberEntitiesStore;
private readonly bool _rememberEntities;

public ITimerScheduler Timers { get; set; } = null!;
private ITimerScheduler Timers { get; }
public IStash Stash { get; set; } = null!;

private string TypeName => _baseImpl.TypeName;
Expand Down Expand Up @@ -130,6 +130,7 @@ public DDataShardCoordinator(

_initEmptyState = CoordinatorState.Empty.WithRememberEntities(settings.RememberEntities);

Timers = Context.Timers;

if (rememberEntitiesStoreProvider != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Akka.Cluster.Sharding.Delivery.Internal;
/// INTERNAL API
/// </summary>
/// <typeparam name="T">The types of messages handled by the ConsumerController</typeparam>
internal class ShardingConsumerController<T> : ReceiveActor, IWithStash, IWithTimers
internal class ShardingConsumerController<T> : ReceiveActor, IWithStash
{
private const string ShutdownTimeoutTimerKey = nameof(ShutdownTimeoutTimerKey);

Expand All @@ -35,6 +35,7 @@ public ShardingConsumerController(Func<IActorRef, Props> consumerProps,
{
ConsumerProps = consumerProps;
Settings = settings;
Timers = Context.Timers;
WaitForStart();
}

Expand Down Expand Up @@ -257,5 +258,5 @@ protected override void PreStart()
}

public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!;
private ITimerScheduler Timers { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace Akka.Cluster.Sharding.Delivery.Internal;
using TotalSeqNr = Int64;
using OutSeqNr = Int64;

internal sealed class ShardingProducerController<T> : ReceiveActor, IWithStash, IWithTimers
internal sealed class ShardingProducerController<T> : ReceiveActor, IWithStash
{
public string ProducerId { get; }

Expand All @@ -48,7 +48,7 @@ internal sealed class ShardingProducerController<T> : ReceiveActor, IWithStash,
private readonly ILoggingAdapter _log = Context.GetLogger();
public IStash Stash { get; set; } = null!;

public ITimerScheduler Timers { get; set; } = null!;
private ITimerScheduler Timers { get; }

public ShardingProducerController(string producerId, IActorRef shardRegion, Option<Props> durableQueueProps,
ShardingProducerController.Settings settings, ITimeProvider? timeProvider = null)
Expand All @@ -59,6 +59,7 @@ public ShardingProducerController(string producerId, IActorRef shardRegion, Opti
Settings = settings;
_timeProvider = timeProvider ?? Context.System.Scheduler;

Timers = Context.Timers;
WaitingForStart(Option<IActorRef>.None, CreateInitialState(_durableQueueProps.HasValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Akka.Cluster.Sharding.Internal
using EntityId = String;
using ShardId = String;

internal class RememberEntityStarter : ActorBase, IWithTimers
internal class RememberEntityStarter : ActorBase
{
public static Props Props(
IActorRef region,
Expand Down Expand Up @@ -90,10 +90,13 @@ public RememberEntityStarter(
OnStartBatch(settings.TuningParameters.EntityRecoveryConstantRateStrategyNumberOfEntities);
break;
}

Timers = Context.Timers;

Timers.StartPeriodicTimer("retry", ResendUnAcked.Instance, settings.TuningParameters.RetryInterval);
}

public ITimerScheduler Timers { get; set; } = null!;
private ITimerScheduler Timers { get; }

public ILoggingAdapter Log { get; } = Context.GetLogger();

Expand Down
Loading
Loading