Skip to content
This repository was archived by the owner on Mar 20, 2025. It is now read-only.

Commit 845d6a0

Browse files
authored
Merge branch 'dev' into #53_Fix_snapshot_store_sequential_access_is_broken
2 parents e461aab + 399017f commit 845d6a0

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
using System.Linq;
2020
using System.Runtime.CompilerServices;
2121
using System.Text;
22+
using System.Threading;
23+
using System.Threading.Tasks;
2224

2325
namespace Akka.Persistence.PostgreSql.Journal
2426
{
@@ -61,6 +63,16 @@ SELECT MAX(e.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.
6163
UNION
6264
SELECT MAX(m.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.FullMetaTableName} m WHERE m.{Configuration.PersistenceIdColumnName} = @PersistenceId) as u";
6365

66+
// As per https://github.com/akkadotnet/Akka.Persistence.PostgreSql/pull/72, apparently PostgreSQL does not like
67+
// it when you chain two deletes in a single command, so we have to split it into two.
68+
// The performance penalty should be minimal, depending on the network speed
69+
DeleteBatchSql = $@"
70+
DELETE FROM {Configuration.FullJournalTableName}
71+
WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;";
72+
73+
DeleteBatchSqlMetadata = $@"DELETE FROM {Configuration.FullMetaTableName}
74+
WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;";
75+
6476
switch (configuration.StoredAs)
6577
{
6678
case StoredAsType.ByteA:
@@ -108,6 +120,8 @@ SELECT MAX(e.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.
108120
protected override string CreateEventsJournalSql { get; }
109121
protected override string CreateMetaTableSql { get; }
110122
protected override string HighestSequenceNrSql { get; }
123+
protected override string DeleteBatchSql { get; }
124+
protected virtual string DeleteBatchSqlMetadata { get; }
111125

112126
protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
113127
{
@@ -190,6 +204,50 @@ protected override IPersistentRepresentation ReadEvent(DbDataReader reader)
190204

191205
return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null, timestamp);
192206
}
207+
208+
public override async Task DeleteBatchAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long toSequenceNr)
209+
{
210+
using (var deleteCommand = GetCommand(connection, DeleteBatchSql))
211+
using (var deleteMetadataCommand = GetCommand(connection, DeleteBatchSqlMetadata))
212+
using (var highestSeqNrCommand = GetCommand(connection, HighestSequenceNrSql))
213+
{
214+
AddParameter(highestSeqNrCommand, "@PersistenceId", DbType.String, persistenceId);
215+
216+
AddParameter(deleteCommand, "@PersistenceId", DbType.String, persistenceId);
217+
AddParameter(deleteCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);
218+
219+
AddParameter(deleteMetadataCommand, "@PersistenceId", DbType.String, persistenceId);
220+
AddParameter(deleteMetadataCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);
221+
222+
using (var tx = connection.BeginTransaction())
223+
{
224+
deleteCommand.Transaction = tx;
225+
deleteMetadataCommand.Transaction = tx;
226+
highestSeqNrCommand.Transaction = tx;
227+
228+
var res = await highestSeqNrCommand.ExecuteScalarAsync(cancellationToken);
229+
var highestSeqNr = res is long ? Convert.ToInt64(res) : 0L;
230+
231+
await deleteCommand.ExecuteNonQueryAsync(cancellationToken);
232+
await deleteMetadataCommand.ExecuteNonQueryAsync(cancellationToken);
233+
234+
if (highestSeqNr <= toSequenceNr)
235+
{
236+
using (var updateCommand = GetCommand(connection, UpdateSequenceNrSql))
237+
{
238+
updateCommand.Transaction = tx;
239+
240+
AddParameter(updateCommand, "@PersistenceId", DbType.String, persistenceId);
241+
AddParameter(updateCommand, "@SequenceNr", DbType.Int64, highestSeqNr);
242+
243+
await updateCommand.ExecuteNonQueryAsync(cancellationToken);
244+
tx.Commit();
245+
}
246+
}
247+
else tx.Commit();
248+
}
249+
}
250+
}
193251
}
194252

195253
public class PostgreSqlQueryConfiguration : QueryConfiguration

0 commit comments

Comments
 (0)