Skip to content

Commit 4bbd699

Browse files
corylanouclaude
andauthored
Fix issue #689: Continuous snapshotting every second (#691)
Co-authored-by: Claude <[email protected]>
1 parent 6c5cd9c commit 4bbd699

File tree

4 files changed

+613
-16
lines changed

4 files changed

+613
-16
lines changed

cmd/litestream/main.go

Lines changed: 138 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"io"
89
"log/slog"
910
"net/url"
1011
"os"
@@ -37,6 +38,33 @@ var (
3738
// errStop is a terminal error for indicating program should quit.
3839
var errStop = errors.New("stop")
3940

41+
// Sentinel errors for configuration validation
42+
var (
43+
ErrInvalidSnapshotInterval = errors.New("snapshot interval must be greater than 0")
44+
ErrInvalidSnapshotRetention = errors.New("snapshot retention must be greater than 0")
45+
ErrInvalidCompactionInterval = errors.New("compaction interval must be greater than 0")
46+
ErrInvalidSyncInterval = errors.New("sync interval must be greater than 0")
47+
ErrConfigFileNotFound = errors.New("config file not found")
48+
)
49+
50+
// ConfigValidationError wraps a validation error with additional context
51+
type ConfigValidationError struct {
52+
Err error
53+
Field string
54+
Value interface{}
55+
}
56+
57+
func (e *ConfigValidationError) Error() string {
58+
if e.Value != nil {
59+
return fmt.Sprintf("%s: %v (got %v)", e.Field, e.Err, e.Value)
60+
}
61+
return fmt.Sprintf("%s: %v", e.Field, e.Err)
62+
}
63+
64+
func (e *ConfigValidationError) Unwrap() error {
65+
return e.Err
66+
}
67+
4068
func main() {
4169
m := NewMain()
4270
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
@@ -190,8 +218,8 @@ type Config struct {
190218

191219
// SnapshotConfig configures snapshots.
192220
type SnapshotConfig struct {
193-
Interval time.Duration `yaml:"interval"`
194-
Retention time.Duration `yaml:"retention"`
221+
Interval *time.Duration `yaml:"interval"`
222+
Retention *time.Duration `yaml:"retention"`
195223
}
196224

197225
// LoggingConfig configures logging.
@@ -217,18 +245,73 @@ func (c *Config) propagateGlobalSettings() {
217245

218246
// DefaultConfig returns a new instance of Config with defaults set.
219247
func DefaultConfig() Config {
248+
defaultSnapshotInterval := 24 * time.Hour
249+
defaultSnapshotRetention := 24 * time.Hour
220250
return Config{
221251
Levels: []*CompactionLevelConfig{
222252
{Interval: 5 * time.Minute},
223253
{Interval: 1 * time.Hour},
224254
},
225255
Snapshot: SnapshotConfig{
226-
Interval: 24 * time.Hour,
227-
Retention: 24 * time.Hour,
256+
Interval: &defaultSnapshotInterval,
257+
Retention: &defaultSnapshotRetention,
228258
},
229259
}
230260
}
231261

262+
// Validate returns an error if config contains invalid settings.
263+
func (c *Config) Validate() error {
264+
// Validate snapshot intervals
265+
if c.Snapshot.Interval != nil && *c.Snapshot.Interval <= 0 {
266+
return &ConfigValidationError{
267+
Err: ErrInvalidSnapshotInterval,
268+
Field: "snapshot.interval",
269+
Value: *c.Snapshot.Interval,
270+
}
271+
}
272+
if c.Snapshot.Retention != nil && *c.Snapshot.Retention <= 0 {
273+
return &ConfigValidationError{
274+
Err: ErrInvalidSnapshotRetention,
275+
Field: "snapshot.retention",
276+
Value: *c.Snapshot.Retention,
277+
}
278+
}
279+
280+
// Validate compaction level intervals
281+
for i, level := range c.Levels {
282+
if level.Interval <= 0 {
283+
return &ConfigValidationError{
284+
Err: ErrInvalidCompactionInterval,
285+
Field: fmt.Sprintf("levels[%d].interval", i),
286+
Value: level.Interval,
287+
}
288+
}
289+
}
290+
291+
// Validate database configs
292+
for _, db := range c.DBs {
293+
// Validate sync intervals for replicas
294+
if db.Replica != nil && db.Replica.SyncInterval != nil && *db.Replica.SyncInterval <= 0 {
295+
return &ConfigValidationError{
296+
Err: ErrInvalidSyncInterval,
297+
Field: fmt.Sprintf("dbs[%s].replica.sync-interval", db.Path),
298+
Value: *db.Replica.SyncInterval,
299+
}
300+
}
301+
for i, replica := range db.Replicas {
302+
if replica.SyncInterval != nil && *replica.SyncInterval <= 0 {
303+
return &ConfigValidationError{
304+
Err: ErrInvalidSyncInterval,
305+
Field: fmt.Sprintf("dbs[%s].replicas[%d].sync-interval", db.Path, i),
306+
Value: *replica.SyncInterval,
307+
}
308+
}
309+
}
310+
}
311+
312+
return nil
313+
}
314+
232315
// CompactionLevels returns a full list of compaction levels include L0.
233316
func (c *Config) CompactionLevels() litestream.CompactionLevels {
234317
levels := litestream.CompactionLevels{
@@ -255,22 +338,46 @@ func (c *Config) DBConfig(path string) *DBConfig {
255338
return nil
256339
}
257340

258-
// ReadConfigFile unmarshals config from filename. Expands path if needed.
259-
// If expandEnv is true then environment variables are expanded in the config.
260-
func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
261-
config := DefaultConfig()
262-
341+
// OpenConfigFile opens a configuration file and returns a reader.
342+
// Expands the filename path if needed.
343+
func OpenConfigFile(filename string) (io.ReadCloser, error) {
263344
// Expand filename, if necessary.
264-
filename, err = expand(filename)
345+
filename, err := expand(filename)
265346
if err != nil {
266-
return config, err
347+
return nil, err
267348
}
268349

269-
// Read configuration.
270-
buf, err := os.ReadFile(filename)
350+
// Open configuration file.
351+
f, err := os.Open(filename)
271352
if os.IsNotExist(err) {
272-
return config, fmt.Errorf("config file not found: %s", filename)
353+
return nil, fmt.Errorf("%w: %s", ErrConfigFileNotFound, filename)
273354
} else if err != nil {
355+
return nil, err
356+
}
357+
358+
return f, nil
359+
}
360+
361+
// ReadConfigFile unmarshals config from filename. Expands path if needed.
362+
// If expandEnv is true then environment variables are expanded in the config.
363+
func ReadConfigFile(filename string, expandEnv bool) (Config, error) {
364+
f, err := OpenConfigFile(filename)
365+
if err != nil {
366+
return DefaultConfig(), err
367+
}
368+
defer f.Close()
369+
370+
return ParseConfig(f, expandEnv)
371+
}
372+
373+
// ParseConfig unmarshals config from a reader.
374+
// If expandEnv is true then environment variables are expanded in the config.
375+
func ParseConfig(r io.Reader, expandEnv bool) (_ Config, err error) {
376+
config := DefaultConfig()
377+
378+
// Read configuration.
379+
buf, err := io.ReadAll(r)
380+
if err != nil {
274381
return config, err
275382
}
276383

@@ -279,10 +386,22 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
279386
buf = []byte(os.ExpandEnv(string(buf)))
280387
}
281388

389+
// Save defaults before unmarshaling
390+
defaultSnapshotInterval := config.Snapshot.Interval
391+
defaultSnapshotRetention := config.Snapshot.Retention
392+
282393
if err := yaml.Unmarshal(buf, &config); err != nil {
283394
return config, err
284395
}
285396

397+
// Restore defaults if they were overwritten with nil by empty YAML sections
398+
if config.Snapshot.Interval == nil {
399+
config.Snapshot.Interval = defaultSnapshotInterval
400+
}
401+
if config.Snapshot.Retention == nil {
402+
config.Snapshot.Retention = defaultSnapshotRetention
403+
}
404+
286405
// Normalize paths.
287406
for _, dbConfig := range config.DBs {
288407
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
@@ -293,6 +412,11 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
293412
// Propage settings from global config to replica configs.
294413
config.propagateGlobalSettings()
295414

415+
// Validate configuration
416+
if err := config.Validate(); err != nil {
417+
return config, err
418+
}
419+
296420
// Configure logging.
297421
logOutput := os.Stdout
298422
if config.Logging.Stderr {

0 commit comments

Comments
 (0)