Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,18 +890,22 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error

// Atomically rename file to final path.
if err := os.Rename(tmpFilename, filename); err != nil {
db.maxLTXFileInfos.Lock()
delete(db.maxLTXFileInfos.m, 0) // clear cache if in unknown state
db.maxLTXFileInfos.Unlock()
return fmt.Errorf("rename ltx file: %w", err)
}

// Update file info cache for L0.
db.maxLTXFileInfos.Lock()
db.maxLTXFileInfos.m[0] = &ltx.FileInfo{
Level: 0,
MinTXID: txID,
MaxTXID: txID,
CreatedAt: time.Now(),
Size: enc.N(),
}
db.maxLTXFileInfos.Unlock()

db.Logger.Debug("db sync", "status", "ok")

Expand Down Expand Up @@ -1101,7 +1105,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {

// SnapshotReader returns the current position of the database & a reader that contains a full database snapshot.
func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
if db.pageSize == 0 {
if db.PageSize() == 0 {
return ltx.Pos{}, nil, fmt.Errorf("page size not initialized yet")
}

Expand Down
78 changes: 78 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -572,6 +573,83 @@ func TestDB_EnforceRetention(t *testing.T) {
}
}

// TestDB_ConcurrentMapWrite tests for race conditions in maxLTXFileInfos map access.
// This test specifically targets the concurrent map write issue found in db.go
// where sync() method writes to the map without proper locking.
// Run with: go test -race -run TestDB_ConcurrentMapWrite
func TestDB_ConcurrentMapWrite(t *testing.T) {
// Use the standard test helpers
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)

// Enable monitoring to trigger background operations
db.MonitorInterval = 10 * time.Millisecond

// Create a table
if _, err := sqldb.Exec(`CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)`); err != nil {
t.Fatal(err)
}

// Start multiple goroutines to trigger concurrent map access
var wg sync.WaitGroup
ctx := context.Background()

// Number of concurrent operations
const numGoroutines = 10

// Channel to signal start
start := make(chan struct{})

for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

// Wait for signal to start all goroutines simultaneously
<-start

// Perform operations that trigger map access
for j := 0; j < 5; j++ {
// This triggers sync() which had unprotected map access
if _, err := sqldb.Exec(`INSERT INTO t (value) VALUES (?)`, "test"); err != nil {
t.Logf("Goroutine %d: insert error: %v", id, err)
}

// Trigger Sync manually which accesses the map
if err := db.Sync(ctx); err != nil {
t.Logf("Goroutine %d: sync error: %v", id, err)
}

// Small delay to allow race to manifest
time.Sleep(time.Millisecond)
}
}(i)
}

// Additional goroutine for snapshot operations
wg.Add(1)
go func() {
defer wg.Done()
<-start

for i := 0; i < 3; i++ {
// This triggers Snapshot() which has protected map access
if _, err := db.Snapshot(ctx); err != nil {
t.Logf("Snapshot error: %v", err)
}
time.Sleep(5 * time.Millisecond)
}
}()

// Start all goroutines
close(start)

// Wait for completion
wg.Wait()

t.Log("Test completed without race condition")
}

func newDB(tb testing.TB, path string) *litestream.DB {
tb.Helper()
tb.Logf("db=%s", path)
Expand Down