Skip to content
This repository was archived by the owner on Mar 20, 2025. It is now read-only.

Commit e461aab

Browse files
committed
Fix broken SnapshotStore when sequential-access is turned on
1 parent 3f00ae4 commit e461aab

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="PostgreSqlSnapshotStoreSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using Akka.Configuration;
9+
using Akka.Persistence.TCK.Snapshot;
10+
using Akka.TestKit;
11+
using Xunit;
12+
using Xunit.Abstractions;
13+
14+
namespace Akka.Persistence.PostgreSql.Tests
15+
{
16+
[Collection("PostgreSqlSpec")]
17+
public class PostgreSqlSnapshotStoreSequentialAccessSpec : SnapshotStoreSpec
18+
{
19+
private static Config Initialize(PostgresFixture fixture)
20+
{
21+
//need to make sure db is created before the tests start
22+
DbUtils.Initialize(fixture);
23+
24+
var config = @"
25+
akka.persistence {
26+
publish-plugin-commands = on
27+
snapshot-store {
28+
plugin = ""akka.persistence.snapshot-store.postgresql""
29+
postgresql {
30+
class = ""Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql""
31+
plugin-dispatcher = ""akka.actor.default-dispatcher""
32+
table-name = snapshot_store
33+
schema-name = public
34+
auto-initialize = on
35+
connection-string = """ + DbUtils.ConnectionString + @"""
36+
sequential-access = on
37+
}
38+
}
39+
}
40+
akka.test.single-expect-default = 10s";
41+
42+
return ConfigurationFactory.ParseString(config);
43+
}
44+
45+
public PostgreSqlSnapshotStoreSequentialAccessSpec(ITestOutputHelper output, PostgresFixture fixture)
46+
: base(Initialize(fixture), "PostgreSqlSnapshotStoreSpec", output: output)
47+
{
48+
Initialize();
49+
}
50+
51+
protected override void Dispose(bool disposing)
52+
{
53+
base.Dispose(disposing);
54+
DbUtils.Clean();
55+
}
56+
57+
[Fact]
58+
public void SnapshotStore_should_save_and_overwrite_snapshot_with_same_sequence_number_unskipped()
59+
{
60+
TestProbe _senderProbe = CreateTestProbe();
61+
var md = Metadata[4];
62+
SnapshotStore.Tell(new SaveSnapshot(md, "s-5-modified"), _senderProbe.Ref);
63+
var md2 = _senderProbe.ExpectMsg<SaveSnapshotSuccess>().Metadata;
64+
Assert.Equal(md.SequenceNr, md2.SequenceNr);
65+
SnapshotStore.Tell(new LoadSnapshot(Pid, new SnapshotSelectionCriteria(md.SequenceNr), long.MaxValue), _senderProbe.Ref);
66+
var result = _senderProbe.ExpectMsg<LoadSnapshotResult>();
67+
Assert.Equal("s-5-modified", result.Snapshot.Snapshot.ToString());
68+
Assert.Equal(md.SequenceNr, result.Snapshot.Metadata.SequenceNr);
69+
// metadata timestamp may have been changed
70+
}
71+
}
72+
}

src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
139139
var sequenceNr = reader.GetInt64(1);
140140
var timestamp = new DateTime(reader.GetInt64(2));
141141
var manifest = reader.GetString(3);
142+
var payloadObject = reader[4];
142143

143144
int? serializerId = null;
144145
Type type = null;
@@ -151,7 +152,7 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
151152
serializerId = reader.GetInt32(5);
152153
}
153154

154-
var snapshot = _deserialize(type, reader[4], manifest, serializerId);
155+
var snapshot = _deserialize(type, payloadObject, manifest, serializerId);
155156

156157
var metadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp);
157158
return new SelectedSnapshot(metadata, snapshot);

0 commit comments

Comments
 (0)