Skip to content

Commit f652186

Browse files
authored
Merge pull request #84 from benbjohnson/snapshot-interval
2 parents ce2d54c + afb8731 commit f652186

File tree

4 files changed

+113
-7
lines changed

4 files changed

+113
-7
lines changed

cmd/litestream/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ type ReplicaConfig struct {
243243
Retention time.Duration `yaml:"retention"`
244244
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
245245
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
246+
SnapshotInterval time.Duration `yaml:"snapshot-interval"`
246247
ValidationInterval time.Duration `yaml:"validation-interval"`
247248

248249
// S3 settings
@@ -304,6 +305,9 @@ func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestrea
304305
if v := c.RetentionCheckInterval; v > 0 {
305306
r.RetentionCheckInterval = v
306307
}
308+
if v := c.SnapshotInterval; v > 0 {
309+
r.SnapshotInterval = v
310+
}
307311
if v := c.ValidationInterval; v > 0 {
308312
r.ValidationInterval = v
309313
}
@@ -372,6 +376,9 @@ func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica,
372376
if v := c.SyncInterval; v > 0 {
373377
r.SyncInterval = v
374378
}
379+
if v := c.SnapshotInterval; v > 0 {
380+
r.SnapshotInterval = v
381+
}
375382
if v := c.ValidationInterval; v > 0 {
376383
r.ValidationInterval = v
377384
}

litestream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636

3737
// Litestream errors.
3838
var (
39+
ErrNoGeneration = errors.New("no generation available")
3940
ErrNoSnapshots = errors.New("no snapshots available")
4041
ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
4142
)

replica.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,11 @@ type FileReplica struct {
9898
walIndexGauge prometheus.Gauge
9999
walOffsetGauge prometheus.Gauge
100100

101+
// Frequency to create new snapshots.
102+
SnapshotInterval time.Duration
103+
101104
// Time to keep snapshots and related WAL files.
102-
// Database is snapshotted after interval and older WAL files are discarded.
105+
// Database is snapshotted after interval, if needed, and older WAL files are discarded.
103106
Retention time.Duration
104107

105108
// Time between checks for retention.
@@ -402,9 +405,10 @@ func (r *FileReplica) Start(ctx context.Context) {
402405
ctx, r.cancel = context.WithCancel(ctx)
403406

404407
// Start goroutine to replicate data.
405-
r.wg.Add(3)
408+
r.wg.Add(4)
406409
go func() { defer r.wg.Done(); r.monitor(ctx) }()
407410
go func() { defer r.wg.Done(); r.retainer(ctx) }()
411+
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
408412
go func() { defer r.wg.Done(); r.validator(ctx) }()
409413
}
410414

@@ -446,7 +450,18 @@ func (r *FileReplica) monitor(ctx context.Context) {
446450

447451
// retainer runs in a separate goroutine and handles retention.
448452
func (r *FileReplica) retainer(ctx context.Context) {
449-
ticker := time.NewTicker(r.RetentionCheckInterval)
453+
// Disable retention enforcement if retention period is non-positive.
454+
if r.Retention <= 0 {
455+
return
456+
}
457+
458+
// Ensure check interval is not longer than retention period.
459+
checkInterval := r.RetentionCheckInterval
460+
if checkInterval > r.Retention {
461+
checkInterval = r.Retention
462+
}
463+
464+
ticker := time.NewTicker(checkInterval)
450465
defer ticker.Stop()
451466

452467
for {
@@ -462,6 +477,28 @@ func (r *FileReplica) retainer(ctx context.Context) {
462477
}
463478
}
464479

480+
// snapshotter runs in a separate goroutine and handles snapshotting.
481+
func (r *FileReplica) snapshotter(ctx context.Context) {
482+
if r.SnapshotInterval <= 0 {
483+
return
484+
}
485+
486+
ticker := time.NewTicker(r.SnapshotInterval)
487+
defer ticker.Stop()
488+
489+
for {
490+
select {
491+
case <-ctx.Done():
492+
return
493+
case <-ticker.C:
494+
if err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
495+
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
496+
continue
497+
}
498+
}
499+
}
500+
}
501+
465502
// validator runs in a separate goroutine and handles periodic validation.
466503
func (r *FileReplica) validator(ctx context.Context) {
467504
// Initialize counters since validation occurs infrequently.
@@ -531,6 +568,18 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos,
531568
return pos, nil
532569
}
533570

571+
// Snapshot copies the entire database to the replica path.
572+
func (r *FileReplica) Snapshot(ctx context.Context) error {
573+
// Find current position of database.
574+
pos, err := r.db.Pos()
575+
if err != nil {
576+
return fmt.Errorf("cannot determine current db generation: %w", err)
577+
} else if pos.IsZero() {
578+
return ErrNoGeneration
579+
}
580+
return r.snapshot(ctx, pos.Generation, pos.Index)
581+
}
582+
534583
// snapshot copies the entire database to the replica path.
535584
func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error {
536585
// Acquire a read lock on the database during snapshot to prevent checkpoints.
@@ -557,7 +606,7 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
557606
return err
558607
}
559608

560-
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
609+
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
561610
return nil
562611
}
563612

s3/s3.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ type Replica struct {
8080
// Time between syncs with the shadow WAL.
8181
SyncInterval time.Duration
8282

83+
// Frequency to create new snapshots.
84+
SnapshotInterval time.Duration
85+
8386
// Time to keep snapshots and related WAL files.
8487
// Database is snapshotted after interval and older WAL files are discarded.
8588
Retention time.Duration
@@ -427,9 +430,10 @@ func (r *Replica) Start(ctx context.Context) {
427430
ctx, r.cancel = context.WithCancel(ctx)
428431

429432
// Start goroutines to manage replica data.
430-
r.wg.Add(3)
433+
r.wg.Add(4)
431434
go func() { defer r.wg.Done(); r.monitor(ctx) }()
432435
go func() { defer r.wg.Done(); r.retainer(ctx) }()
436+
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
433437
go func() { defer r.wg.Done(); r.validator(ctx) }()
434438
}
435439

@@ -479,7 +483,18 @@ func (r *Replica) monitor(ctx context.Context) {
479483

480484
// retainer runs in a separate goroutine and handles retention.
481485
func (r *Replica) retainer(ctx context.Context) {
482-
ticker := time.NewTicker(r.RetentionCheckInterval)
486+
// Disable retention enforcement if retention period is non-positive.
487+
if r.Retention <= 0 {
488+
return
489+
}
490+
491+
// Ensure check interval is not longer than retention period.
492+
checkInterval := r.RetentionCheckInterval
493+
if checkInterval > r.Retention {
494+
checkInterval = r.Retention
495+
}
496+
497+
ticker := time.NewTicker(checkInterval)
483498
defer ticker.Stop()
484499

485500
for {
@@ -495,6 +510,28 @@ func (r *Replica) retainer(ctx context.Context) {
495510
}
496511
}
497512

513+
// snapshotter runs in a separate goroutine and handles snapshotting.
514+
func (r *Replica) snapshotter(ctx context.Context) {
515+
if r.SnapshotInterval <= 0 {
516+
return
517+
}
518+
519+
ticker := time.NewTicker(r.SnapshotInterval)
520+
defer ticker.Stop()
521+
522+
for {
523+
select {
524+
case <-ctx.Done():
525+
return
526+
case <-ticker.C:
527+
if err := r.Snapshot(ctx); err != nil && err != litestream.ErrNoGeneration {
528+
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
529+
continue
530+
}
531+
}
532+
}
533+
}
534+
498535
// validator runs in a separate goroutine and handles periodic validation.
499536
func (r *Replica) validator(ctx context.Context) {
500537
// Initialize counters since validation occurs infrequently.
@@ -572,6 +609,18 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea
572609
return pos, nil
573610
}
574611

612+
// Snapshot copies the entire database to the replica path.
613+
func (r *Replica) Snapshot(ctx context.Context) error {
614+
// Find current position of database.
615+
pos, err := r.db.Pos()
616+
if err != nil {
617+
return fmt.Errorf("cannot determine current db generation: %w", err)
618+
} else if pos.IsZero() {
619+
return litestream.ErrNoGeneration
620+
}
621+
return r.snapshot(ctx, pos.Generation, pos.Index)
622+
}
623+
575624
// snapshot copies the entire database to the replica path.
576625
func (r *Replica) snapshot(ctx context.Context, generation string, index int) error {
577626
// Acquire a read lock on the database during snapshot to prevent checkpoints.
@@ -620,7 +669,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
620669
r.putOperationTotalCounter.Inc()
621670
r.putOperationBytesCounter.Add(float64(fi.Size()))
622671

623-
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
672+
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
624673

625674
return nil
626675
}

0 commit comments

Comments
 (0)