4
4
"context"
5
5
"encoding/binary"
6
6
"fmt"
7
+ "hash/crc64"
7
8
"io"
8
9
"io/ioutil"
9
10
"log"
@@ -31,10 +32,10 @@ type Replica interface {
31
32
DB () * DB
32
33
33
34
// Starts replicating in a background goroutine.
34
- Start (ctx context.Context )
35
+ Start (ctx context.Context ) error
35
36
36
37
// Stops all replication processing. Blocks until processing stopped.
37
- Stop ()
38
+ Stop (hard bool ) error
38
39
39
40
// Returns the last replication position.
40
41
LastPos () Pos
@@ -90,6 +91,9 @@ type FileReplica struct {
90
91
mu sync.RWMutex
91
92
pos Pos // last position
92
93
94
+ muf sync.Mutex
95
+ f * os.File // long-running file descriptor to avoid non-OFD lock issues
96
+
93
97
wg sync.WaitGroup
94
98
cancel func ()
95
99
@@ -392,14 +396,19 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
392
396
}
393
397
394
398
// Start starts replication for a given generation.
395
- func (r * FileReplica ) Start (ctx context.Context ) {
399
+ func (r * FileReplica ) Start (ctx context.Context ) ( err error ) {
396
400
// Ignore if replica is being used sychronously.
397
401
if ! r .MonitorEnabled {
398
- return
402
+ return nil
399
403
}
400
404
401
405
// Stop previous replication.
402
- r .Stop ()
406
+ r .Stop (false )
407
+
408
+ // Open db file descriptor.
409
+ if r .f , err = os .Open (r .db .Path ()); err != nil {
410
+ return err
411
+ }
403
412
404
413
// Wrap context with cancelation.
405
414
ctx , r .cancel = context .WithCancel (ctx )
@@ -410,12 +419,27 @@ func (r *FileReplica) Start(ctx context.Context) {
410
419
go func () { defer r .wg .Done (); r .retainer (ctx ) }()
411
420
go func () { defer r .wg .Done (); r .snapshotter (ctx ) }()
412
421
go func () { defer r .wg .Done (); r .validator (ctx ) }()
422
+
423
+ return nil
413
424
}
414
425
415
426
// Stop cancels any outstanding replication and blocks until finished.
416
- func (r * FileReplica ) Stop () {
427
+ //
428
+ // Performing a hard stop will close the DB file descriptor which could release
429
+ // locks on per-process locks. Hard stops should only be performed when
430
+ // stopping the entire process.
431
+ func (r * FileReplica ) Stop (hard bool ) (err error ) {
417
432
r .cancel ()
418
433
r .wg .Wait ()
434
+
435
+ r .muf .Lock ()
436
+ defer r .muf .Unlock ()
437
+ if hard && r .f != nil {
438
+ if e := r .f .Close (); e != nil && err == nil {
439
+ err = e
440
+ }
441
+ }
442
+ return err
419
443
}
420
444
421
445
// monitor runs in a separate goroutine and continuously replicates the DB.
@@ -582,6 +606,9 @@ func (r *FileReplica) Snapshot(ctx context.Context) error {
582
606
583
607
// snapshot copies the entire database to the replica path.
584
608
func (r * FileReplica ) snapshot (ctx context.Context , generation string , index int ) error {
609
+ r .muf .Lock ()
610
+ defer r .muf .Unlock ()
611
+
585
612
// Acquire a read lock on the database during snapshot to prevent checkpoints.
586
613
tx , err := r .db .db .Begin ()
587
614
if err != nil {
@@ -602,7 +629,39 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
602
629
603
630
if err := mkdirAll (filepath .Dir (snapshotPath ), r .db .dirmode , r .db .diruid , r .db .dirgid ); err != nil {
604
631
return err
605
- } else if err := compressFile (r .db .Path (), snapshotPath , r .db .uid , r .db .gid ); err != nil {
632
+ }
633
+
634
+ if _ , err := r .f .Seek (0 , io .SeekStart ); err != nil {
635
+ return err
636
+ }
637
+
638
+ fi , err := r .f .Stat ()
639
+ if err != nil {
640
+ return err
641
+ }
642
+
643
+ w , err := createFile (snapshotPath + ".tmp" , fi .Mode (), r .db .uid , r .db .gid )
644
+ if err != nil {
645
+ return err
646
+ }
647
+ defer w .Close ()
648
+
649
+ zr := lz4 .NewWriter (w )
650
+ defer zr .Close ()
651
+
652
+ // Copy & compress file contents to temporary file.
653
+ if _ , err := io .Copy (zr , r .f ); err != nil {
654
+ return err
655
+ } else if err := zr .Close (); err != nil {
656
+ return err
657
+ } else if err := w .Sync (); err != nil {
658
+ return err
659
+ } else if err := w .Close (); err != nil {
660
+ return err
661
+ }
662
+
663
+ // Move compressed file to final location.
664
+ if err := os .Rename (snapshotPath + ".tmp" , snapshotPath ); err != nil {
606
665
return err
607
666
}
608
667
@@ -805,7 +864,7 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error {
805
864
}
806
865
807
866
dst := filename + ".lz4"
808
- if err := compressFile (filename , dst , r .db .uid , r .db .gid ); err != nil {
867
+ if err := compressWALFile (filename , dst , r .db .uid , r .db .gid ); err != nil {
809
868
return err
810
869
} else if err := os .Remove (filename ); err != nil {
811
870
return err
@@ -1051,8 +1110,9 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int,
1051
1110
return index , nil
1052
1111
}
1053
1112
1054
- // compressFile compresses a file and replaces it with a new file with a .lz4 extension.
1055
- func compressFile (src , dst string , uid , gid int ) error {
1113
+ // compressWALFile compresses a file and replaces it with a new file with a .lz4 extension.
1114
+ // Do not use this on database files because of issues with non-OFD locks.
1115
+ func compressWALFile (src , dst string , uid , gid int ) error {
1056
1116
r , err := os .Open (src )
1057
1117
if err != nil {
1058
1118
return err
@@ -1102,7 +1162,6 @@ func ValidateReplica(ctx context.Context, r Replica) error {
1102
1162
1103
1163
// Compute checksum of primary database under lock. This prevents a
1104
1164
// sync from occurring and the database will not be written.
1105
- primaryPath := filepath .Join (tmpdir , "primary" )
1106
1165
chksum0 , pos , err := db .CRC64 ()
1107
1166
if err != nil {
1108
1167
return fmt .Errorf ("cannot compute checksum: %w" , err )
@@ -1125,10 +1184,19 @@ func ValidateReplica(ctx context.Context, r Replica) error {
1125
1184
}
1126
1185
1127
1186
// Open file handle for restored database.
1128
- chksum1 , err := checksumFile (restorePath )
1187
+ // NOTE: This open is ok as the restored database is not managed by litestream.
1188
+ f , err := os .Open (restorePath )
1129
1189
if err != nil {
1130
1190
return err
1131
1191
}
1192
+ defer f .Close ()
1193
+
1194
+ // Read entire file into checksum.
1195
+ h := crc64 .New (crc64 .MakeTable (crc64 .ISO ))
1196
+ if _ , err := io .Copy (h , f ); err != nil {
1197
+ return err
1198
+ }
1199
+ chksum1 := h .Sum64 ()
1132
1200
1133
1201
status := "ok"
1134
1202
mismatch := chksum0 != chksum1
@@ -1140,15 +1208,6 @@ func ValidateReplica(ctx context.Context, r Replica) error {
1140
1208
// Validate checksums match.
1141
1209
if mismatch {
1142
1210
internal .ReplicaValidationTotalCounterVec .WithLabelValues (db .Path (), r .Name (), "error" ).Inc ()
1143
-
1144
- // Compress mismatched databases and report temporary path for investigation.
1145
- if err := compressFile (primaryPath , primaryPath + ".lz4" , db .uid , db .gid ); err != nil {
1146
- return fmt .Errorf ("cannot compress primary db: %w" , err )
1147
- } else if err := compressFile (restorePath , restorePath + ".lz4" , db .uid , db .gid ); err != nil {
1148
- return fmt .Errorf ("cannot compress replica db: %w" , err )
1149
- }
1150
- log .Printf ("%s(%s): validator: mismatch files @ %s" , db .Path (), r .Name (), tmpdir )
1151
-
1152
1211
return ErrChecksumMismatch
1153
1212
}
1154
1213
0 commit comments