Skip to content
This repository was archived by the owner on Mar 20, 2025. It is now read-only.

Commit 25b4e5d

Browse files
authored
Merge pull request #84 from Arkatufus/Add_option_to_use_BIGINT_IDENTITY_for_journal_ordering_column
Add option to use BIGINT IDENTITY for journal ordering column
2 parents 4631086 + ca50adf commit 25b4e5d

File tree

6 files changed

+185
-7
lines changed

6 files changed

+185
-7
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ akka.persistence{
4242
4343
# defines column db type used to store payload. Available option: BYTEA (default), JSON, JSONB
4444
stored-as = BYTEA
45+
46+
# Setting used to toggle sequential read access when loading large objects
47+
# from journals and snapshot stores.
48+
sequential-access = off
49+
50+
# When turned on, persistence will use `BIGINT` and `GENERATED ALWAYS AS IDENTITY`
51+
# for journal table schema creation.
52+
# NOTE: This only affects newly created tables, as such, it should not affect any
53+
# existing database.
54+
#
55+
# !!!!! WARNING !!!!!
56+
# To use this feature, you have to have PorsgreSql version 10 or above
57+
use-bigint-identity-for-ordering-column = off
4558
}
4659
}
4760
@@ -71,6 +84,10 @@ akka.persistence{
7184
7285
# defines column db type used to store payload. Available option: BYTEA (default), JSON, JSONB
7386
stored-as = BYTEA
87+
88+
# Setting used to toggle sequential read access when loading large objects
89+
# from journals and snapshot stores.
90+
sequential-access = off
7491
}
7592
}
7693
}
@@ -110,6 +127,23 @@ CREATE TABLE {your_metadata_table_name} (
110127
);
111128
```
112129

130+
Note that if you turn on the `akka.persistence.journal.postgresql.use-bigint-identity-for-ordering-column` flag, the journal table schema will be altered to the latest recommended primary key setting.
131+
```
132+
CREATE TABLE {your_journal_table_name} (
133+
ordering BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
134+
persistence_id VARCHAR(255) NOT NULL,
135+
sequence_nr BIGINT NOT NULL,
136+
is_deleted BOOLEAN NOT NULL,
137+
created_at BIGINT NOT NULL,
138+
manifest VARCHAR(500) NOT NULL,
139+
payload BYTEA NOT NULL,
140+
tags VARCHAR(100) NULL,
141+
serializer_id INTEGER NULL,
142+
CONSTRAINT {your_journal_table_name}_uq UNIQUE (persistence_id, sequence_nr)
143+
);
144+
```
145+
Since this script is only run once during table generation, we will not provide any migration path for this change, any migration is left as an exercise for the user.
146+
113147
### Migration
114148

115149
#### From 1.1.0 to 1.3.1
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="PostgreSqlJournalSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Immutable;
10+
using System.Linq;
11+
using System.Threading.Tasks;
12+
using Akka.Actor;
13+
using Akka.Configuration;
14+
using Akka.Persistence.TCK;
15+
using Akka.Persistence.TCK.Journal;
16+
using FluentAssertions;
17+
using Npgsql;
18+
using Xunit;
19+
using Xunit.Abstractions;
20+
21+
namespace Akka.Persistence.PostgreSql.Tests
22+
{
23+
[Collection("PostgreSqlSpec")]
24+
public class PostgreSqlJournalBigIntSpec : JournalSpec
25+
{
26+
private static Config Initialize(PostgresFixture fixture)
27+
{
28+
//need to make sure db is created before the tests start
29+
DbUtils.Initialize(fixture);
30+
31+
var config = @"
32+
akka.persistence {
33+
publish-plugin-commands = on
34+
journal {
35+
plugin = ""akka.persistence.journal.postgresql""
36+
postgresql {
37+
class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql""
38+
plugin-dispatcher = ""akka.actor.default-dispatcher""
39+
table-name = event_journal
40+
schema-name = public
41+
auto-initialize = on
42+
connection-string = """ + DbUtils.ConnectionString + @"""
43+
use-bigint-identity-for-ordering-column = on
44+
}
45+
}
46+
}
47+
akka.test.single-expect-default = 10s";
48+
49+
return ConfigurationFactory.ParseString(config);
50+
}
51+
52+
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
53+
protected override bool SupportsSerialization => false;
54+
55+
public PostgreSqlJournalBigIntSpec(ITestOutputHelper output, PostgresFixture fixture)
56+
: base(Initialize(fixture), "PostgreSqlJournalBigIntSpec", output)
57+
{
58+
Initialize();
59+
}
60+
61+
protected override void Dispose(bool disposing)
62+
{
63+
base.Dispose(disposing);
64+
DbUtils.Clean();
65+
}
66+
67+
[Fact]
68+
public async Task BigInt_Journal_ordering_column_data_type_should_be_BigInt()
69+
{
70+
using (var conn = new NpgsqlConnection(DbUtils.ConnectionString))
71+
{
72+
conn.Open();
73+
74+
var sql = $@"
75+
SELECT column_name, column_default, data_type, is_identity, identity_generation
76+
FROM information_schema.columns
77+
WHERE table_schema = 'public'
78+
AND table_name = 'event_journal'
79+
AND ordinal_position = 1";
80+
81+
using (var cmd = new NpgsqlCommand(sql, conn))
82+
{
83+
var reader = await cmd.ExecuteReaderAsync();
84+
await reader.ReadAsync();
85+
86+
// these are the "fingerprint" of BIGINT ... GENERATED ALWAYS AS IDENTITY
87+
reader.GetString(0).Should().Be("ordering");
88+
reader[1].Should().BeOfType<DBNull>();
89+
reader.GetString(2).Should().Be("bigint");
90+
reader.GetString(3).Should().Be("YES");
91+
reader.GetString(4).Should().Be("ALWAYS");
92+
}
93+
}
94+
}
95+
}
96+
}

src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalSpec.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,15 @@
55
// </copyright>
66
//-----------------------------------------------------------------------
77

8+
using System;
9+
using System.Reflection;
10+
using System.Threading.Tasks;
11+
using Akka.Actor;
812
using Akka.Configuration;
13+
using Akka.Persistence.PostgreSql.Journal;
914
using Akka.Persistence.TCK.Journal;
15+
using FluentAssertions;
16+
using Npgsql;
1017
using Xunit;
1118
using Xunit.Abstractions;
1219

@@ -54,5 +61,34 @@ protected override void Dispose(bool disposing)
5461
base.Dispose(disposing);
5562
DbUtils.Clean();
5663
}
64+
65+
[Fact]
66+
public async Task BigSerial_Journal_ordering_column_data_type_should_be_BigSerial()
67+
{
68+
using (var conn = new NpgsqlConnection(DbUtils.ConnectionString))
69+
{
70+
conn.Open();
71+
72+
var sql = $@"
73+
SELECT column_name, column_default, data_type, is_identity, identity_generation
74+
FROM information_schema.columns
75+
WHERE table_schema = 'public'
76+
AND table_name = 'event_journal'
77+
AND ordinal_position = 1";
78+
79+
using (var cmd = new NpgsqlCommand(sql, conn))
80+
{
81+
var reader = await cmd.ExecuteReaderAsync();
82+
await reader.ReadAsync();
83+
84+
// these are the "fingerprint" of BIGSERIAL
85+
reader.GetString(0).Should().Be("ordering");
86+
reader.GetString(1).Should().Be("nextval('event_journal_ordering_seq'::regclass)");
87+
reader.GetString(2).Should().Be("bigint");
88+
reader.GetString(3).Should().Be("NO");
89+
reader[4].Should().BeOfType<DBNull>();
90+
}
91+
}
92+
}
5793
}
5894
}

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlJournal.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public PostgreSqlJournal(Config journalConfig) : base(journalConfig)
4848
timeout: config.GetTimeSpan("connection-timeout"),
4949
storedAs: storedAs,
5050
defaultSerializer: config.GetString("serializer"),
51-
useSequentialAccess: config.GetBoolean("sequential-access")),
51+
useSequentialAccess: config.GetBoolean("sequential-access"),
52+
useBigIntPrimaryKey: config.GetBoolean("use-bigint-identity-for-ordering-column")),
5253
Context.System.Serialization,
5354
GetTimestampProvider(config.GetString("timestamp-provider")));
5455

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
3434
{
3535
var storedAs = configuration.StoredAs.ToString().ToUpperInvariant();
3636

37-
CreateEventsJournalSql = $@"
37+
CreateEventsJournalSql = $@"
3838
CREATE TABLE IF NOT EXISTS {Configuration.FullJournalTableName} (
39-
{Configuration.OrderingColumnName} BIGSERIAL NOT NULL PRIMARY KEY,
39+
{Configuration.OrderingColumnName} {(configuration.UseBigIntPrimaryKey ? "BIGINT GENERATED ALWAYS AS IDENTITY" : "BIGSERIAL")} NOT NULL PRIMARY KEY,
4040
{Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL,
4141
{Configuration.SequenceNrColumnName} BIGINT NOT NULL,
4242
{Configuration.IsDeletedColumnName} BOOLEAN NOT NULL,
@@ -46,8 +46,7 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
4646
{Configuration.TagsColumnName} VARCHAR(100) NULL,
4747
{Configuration.SerializerIdColumnName} INTEGER NULL,
4848
CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
49-
);
50-
";
49+
);";
5150

5251
CreateMetaTableSql = $@"
5352
CREATE TABLE IF NOT EXISTS {Configuration.FullMetaTableName} (
@@ -254,6 +253,7 @@ public class PostgreSqlQueryConfiguration : QueryConfiguration
254253
{
255254
public readonly StoredAsType StoredAs;
256255
public readonly JsonSerializerSettings JsonSerializerSettings;
256+
public readonly bool UseBigIntPrimaryKey;
257257

258258
public PostgreSqlQueryConfiguration(
259259
string schemaName,
@@ -270,14 +270,16 @@ public PostgreSqlQueryConfiguration(
270270
string serializerIdColumnName,
271271
TimeSpan timeout,
272272
StoredAsType storedAs,
273-
string defaultSerializer,
273+
string defaultSerializer,
274274
JsonSerializerSettings jsonSerializerSettings = null,
275-
bool useSequentialAccess = true)
275+
bool useSequentialAccess = true,
276+
bool useBigIntPrimaryKey = false)
276277
: base(schemaName, journalEventsTableName, metaTableName, persistenceIdColumnName, sequenceNrColumnName,
277278
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn,
278279
serializerIdColumnName, timeout, defaultSerializer, useSequentialAccess)
279280
{
280281
StoredAs = storedAs;
282+
UseBigIntPrimaryKey = useBigIntPrimaryKey;
281283
JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings
282284
{
283285
ContractResolver = new AkkaContractResolver()

src/Akka.Persistence.PostgreSql/postgresql.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@
3737
# Setting used to toggle sequential read access when loading large objects
3838
# from journals and snapshot stores.
3939
sequential-access = off
40+
41+
# When turned on, persistence will use `BIGINT` and `GENERATED ALWAYS AS IDENTITY`
42+
# for journal table schema creation.
43+
# NOTE: This only affects newly created tables, as such, it should not affect any
44+
# existing database.
45+
#
46+
# !!!!! WARNING !!!!!
47+
# To use this feature, you have to have PorsgreSql version 10 or above
48+
use-bigint-identity-for-ordering-column = off
4049
}
4150
}
4251

0 commit comments

Comments
 (0)