Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions cmd/litestream-vfs/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,74 @@ func TestVFS_Integration(t *testing.T) {
t.Fatalf("got %d, want %d", got, want)
}
})

t.Run("Updating", func(t *testing.T) {
client := file.NewReplicaClient(t.TempDir())
vfs := newVFS(t, client)
if err := sqlite3vfs.RegisterVFS("litestream", vfs); err != nil {
t.Fatalf("failed to register litestream vfs: %v", err)
}

db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db"))
db.MonitorInterval = 100 * time.Millisecond
db.Replica = litestream.NewReplica(db)
db.Replica.Client = client
db.Replica.SyncInterval = 100 * time.Millisecond
if err := db.Open(); err != nil {
t.Fatal(err)
}
sqldb0 := testingutil.MustOpenSQLDB(t, db.Path())
defer testingutil.MustCloseSQLDB(t, sqldb0)

t.Log("creating table")
if _, err := sqldb0.Exec("CREATE TABLE t (x)"); err != nil {
t.Fatal(err)
}
if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil {
t.Fatal(err)
}
time.Sleep(2 * db.MonitorInterval)

t.Log("opening vfs")
sqldb1, err := sql.Open("sqlite3", "file:/tmp/test.db?vfs=litestream")
if err != nil {
t.Fatalf("failed to open database: %v", err)
}
defer sqldb1.Close()

// Execute query
var x int
if err := sqldb1.QueryRow("SELECT * FROM t").Scan(&x); err != nil {
t.Fatalf("failed to query database: %v", err)
} else if got, want := x, 100; got != want {
t.Fatalf("got %d, want %d", got, want)
}

t.Log("updating source database")
// Update the value from the source database.
if _, err := sqldb0.Exec("UPDATE t SET x = 200"); err != nil {
t.Fatal(err)
}
time.Sleep(5 * db.MonitorInterval)

// Ensure replica has updated itself.
t.Log("ensuring replica has updated")
if err := sqldb1.QueryRow("SELECT * FROM t").Scan(&x); err != nil {
t.Fatalf("failed to query database: %v", err)
} else if got, want := x, 200; got != want {
t.Fatalf("got %d, want %d", got, want)
}
})
}

func newVFS(tb testing.TB, client litestream.ReplicaClient) *litestream.VFS {
tb.Helper()
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
return litestream.NewVFS(client, logger)

logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

vfs := litestream.NewVFS(client, logger)
vfs.PollInterval = 100 * time.Millisecond
return vfs
}
138 changes: 123 additions & 15 deletions vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,33 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"time"

"github.com/psanford/sqlite3vfs"
"github.com/superfly/ltx"
)

const (
DefaultPollInterval = 1 * time.Second
)

// VFS implements the SQLite VFS interface for Litestream.
// It is intended to be used for read replicas that read directly from S3.
type VFS struct {
client ReplicaClient
logger *slog.Logger

// PollInterval is the interval at which to poll the replica client for new
// LTX files. The index will be fetched for the new files automatically.
PollInterval time.Duration
}

func NewVFS(client ReplicaClient, logger *slog.Logger) *VFS {
return &VFS{
client: client,
logger: logger,
client: client,
logger: logger.With("vfs", "true"),
PollInterval: DefaultPollInterval,
}
}

Expand All @@ -31,6 +41,7 @@ func (vfs *VFS) Open(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, s
// TODO: Clone client w/ new path based on name.

f := NewVFSFile(vfs.client, name, vfs.logger.With("name", name))
f.PollInterval = vfs.PollInterval
if err := f.Open(); err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -62,21 +73,39 @@ func (vfs *VFS) FullPathname(name string) string {

// VFSFile implements the SQLite VFS file interface.
type VFSFile struct {
mu sync.Mutex
client ReplicaClient
name string

pos ltx.Pos
index map[uint32]ltx.PageIndexElem

wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

logger *slog.Logger

PollInterval time.Duration
}

func NewVFSFile(client ReplicaClient, name string, logger *slog.Logger) *VFSFile {
return &VFSFile{
client: client,
name: name,
index: make(map[uint32]ltx.PageIndexElem),
logger: logger,
f := &VFSFile{
client: client,
name: name,
index: make(map[uint32]ltx.PageIndexElem),
logger: logger,
PollInterval: DefaultPollInterval,
}
f.ctx, f.cancel = context.WithCancel(context.Background())
return f
}

// Pos returns the current position of the file.
func (f *VFSFile) Pos() ltx.Pos {
f.mu.Lock()
defer f.mu.Unlock()
return f.pos
}

func (f *VFSFile) Open() error {
Expand All @@ -91,14 +120,35 @@ func (f *VFSFile) Open() error {
return fmt.Errorf("no backup files available") // TODO: Open even when no files available.
}

// Determine the current position based off the latest LTX file.
var pos ltx.Pos
if len(infos) > 0 {
pos = ltx.Pos{TXID: infos[len(infos)-1].MaxTXID}
}
f.pos = pos

// Build the page index so we can lookup individual pages.
if err := f.buildIndex(f.ctx, infos); err != nil {
f.logger.Error("cannot build index", "error", err)
return fmt.Errorf("cannot build index: %w", err)
}

// Continuously monitor the replica client for new LTX files.
f.wg.Add(1)
go func() { defer f.wg.Done(); f.monitorReplicaClient(f.ctx) }()

return nil
}

// buildIndex constructs a lookup of pgno to LTX file offsets.
func (f *VFSFile) buildIndex(ctx context.Context, infos []*ltx.FileInfo) error {
index := make(map[uint32]ltx.PageIndexElem)
for _, info := range infos {
f.logger.Info("opening page index", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)
f.logger.Debug("opening page index", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)

// Read page index.
idx, err := FetchPageIndex(context.Background(), f.client, info)
if err != nil {
f.logger.Error("cannot fetch page index", "error", err)
return fmt.Errorf("fetch page index: %w", err)
}

Expand All @@ -108,7 +158,10 @@ func (f *VFSFile) Open() error {
index[k] = v
}
}

f.mu.Lock()
f.index = index
f.mu.Unlock()

return nil
}
Expand All @@ -133,12 +186,6 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) {
return 0, fmt.Errorf("fetch page: %w", err)
}

// TODO: Implement a size check, if possible. Not all reads are full pages.
//if len(data) != len(p) {
// f.logger.Error("page data length mismatch", "expected", len(p), "actual", len(data))
// return 0, fmt.Errorf("page data length mismatch: %d != %d", len(p), len(data))
//}

n = copy(p, data)
f.logger.Info("data read", "n", n, "data", len(data))

Expand Down Expand Up @@ -200,3 +247,64 @@ func (f *VFSFile) DeviceCharacteristics() sqlite3vfs.DeviceCharacteristic {
f.logger.Info("device characteristics")
return 0
}

func (f *VFSFile) monitorReplicaClient(ctx context.Context) {
ticker := time.NewTicker(f.PollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := f.pollReplicaClient(ctx); err != nil {
f.logger.Error("cannot fetch new ltx files", "error", err)
}
}
}
}

// pollReplicaClient fetches new LTX files from the replica client and updates
// the page index & the current position.
func (f *VFSFile) pollReplicaClient(ctx context.Context) error {
pos := f.Pos()
f.logger.Debug("polling replica client", "txid", pos.TXID.String())

// Start reading from the next LTX file after the current position.
itr, err := f.client.LTXFiles(ctx, 0, f.pos.TXID+1)
if err != nil {
return fmt.Errorf("ltx files: %w", err)
}

// Build an update across all new LTX files.
for itr.Next() {
info := itr.Item()

// Ensure we are fetching the next transaction from our current position.
f.mu.Lock()
isNextTXID := info.MinTXID == f.pos.TXID+1
f.mu.Unlock()
if !isNextTXID {
return fmt.Errorf("non-contiguous ltx file: current=%s, next=%s-%s", f.pos.TXID, info.MinTXID, info.MaxTXID)
}

f.logger.Debug("new ltx file", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)

// Read page index.
idx, err := FetchPageIndex(context.Background(), f.client, info)
if err != nil {
return fmt.Errorf("fetch page index: %w", err)
}

// Update the page index & current position.
f.mu.Lock()
for k, v := range idx {
f.logger.Debug("adding new page index", "page", k, "elem", v)
f.index[k] = v
}
f.pos.TXID = info.MaxTXID
f.mu.Unlock()
}

return nil
}