Skip to content

Commit 77b61ab

Browse files
anand76facebook-github-bot
authored andcommitted
Fix bug in WAL streaming uncompression (#11198)
Summary: Fix a bug in the calculation of the input buffer address/offset in log_reader.cc. The bug is when consecutive fragments of a compressed record are located at the same offset in the log reader buffer, the second fragment input buffer is treated as a leftover from the previous input buffer. As a result, the offset in the `ZSTD_inBuffer` is not reset. Pull Request resolved: #11198 Test Plan: Add a unit test in log_test.cc that fails without the fix and passes with it. Reviewed By: ajkr, cbi42 Differential Revision: D43102692 Pulled By: anand1976 fbshipit-source-id: aa2648f4802c33991b76a3233c5a58d4cc9e77fd
1 parent 876d281 commit 77b61ab

File tree

5 files changed

+51
-12
lines changed

5 files changed

+51
-12
lines changed

HISTORY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish
1313
* Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()`
1414
* Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support.
15+
* Fixed a bug in DB open/recovery from a compressed WAL that was caused due to incorrect handling of certain record fragments with the same offset within a WAL block.
1516

1617
### Feature Removal
1718
* Remove RocksDB Lite.

db/log_reader.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -515,10 +515,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
515515

516516
size_t uncompressed_size = 0;
517517
int remaining = 0;
518+
const char* input = header + header_size;
518519
do {
519-
remaining = uncompress_->Uncompress(header + header_size, length,
520-
uncompressed_buffer_.get(),
521-
&uncompressed_size);
520+
remaining = uncompress_->Uncompress(
521+
input, length, uncompressed_buffer_.get(), &uncompressed_size);
522+
input = nullptr;
522523
if (remaining < 0) {
523524
buffer_.clear();
524525
return kBadRecord;
@@ -830,10 +831,11 @@ bool FragmentBufferedReader::TryReadFragment(
830831
uncompressed_record_.clear();
831832
size_t uncompressed_size = 0;
832833
int remaining = 0;
834+
const char* input = header + header_size;
833835
do {
834-
remaining = uncompress_->Uncompress(header + header_size, length,
835-
uncompressed_buffer_.get(),
836-
&uncompressed_size);
836+
remaining = uncompress_->Uncompress(
837+
input, length, uncompressed_buffer_.get(), &uncompressed_size);
838+
input = nullptr;
837839
if (remaining < 0) {
838840
buffer_.clear();
839841
*fragment_type_or_err = kBadRecord;

db/log_test.cc

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,38 @@ TEST_P(CompressionLogTest, Fragmentation) {
979979
ASSERT_EQ("EOF", Read());
980980
}
981981

982+
TEST_P(CompressionLogTest, AlignedFragmentation) {
983+
CompressionType compression_type = std::get<2>(GetParam());
984+
if (!StreamingCompressionTypeSupported(compression_type)) {
985+
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
986+
return;
987+
}
988+
ASSERT_OK(SetupTestEnv());
989+
Random rnd(301);
990+
int num_filler_records = 0;
991+
// Keep writing small records until the next record will be aligned at the
992+
// beginning of the block.
993+
while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) {
994+
char entry = 'a';
995+
ASSERT_OK(writer_->AddRecord(Slice(&entry, 1)));
996+
num_filler_records++;
997+
}
998+
const std::vector<std::string> wal_entries = {
999+
rnd.RandomBinaryString(3 * kBlockSize),
1000+
};
1001+
for (const std::string& wal_entry : wal_entries) {
1002+
Write(wal_entry);
1003+
}
1004+
1005+
for (int i = 0; i < num_filler_records; ++i) {
1006+
ASSERT_EQ("a", Read());
1007+
}
1008+
for (const std::string& wal_entry : wal_entries) {
1009+
ASSERT_EQ(wal_entry, Read());
1010+
}
1011+
ASSERT_EQ("EOF", Read());
1012+
}
1013+
9821014
INSTANTIATE_TEST_CASE_P(
9831015
Compression, CompressionLogTest,
9841016
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
@@ -1026,10 +1058,11 @@ TEST_P(StreamingCompressionTest, Basic) {
10261058
for (int i = 0; i < (int)compressed_buffers.size(); i++) {
10271059
// Call uncompress till either the entire input is consumed or the output
10281060
// buffer size is equal to the allocated output buffer size.
1061+
const char* input = compressed_buffers[i].c_str();
10291062
do {
1030-
ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(),
1031-
compressed_buffers[i].size(),
1063+
ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(),
10321064
uncompressed_output_buffer, &output_pos);
1065+
input = nullptr;
10331066
if (output_pos > 0) {
10341067
std::string uncompressed_fragment;
10351068
uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);

util/compression.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,14 @@ void ZSTDStreamingCompress::Reset() {
8585

8686
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
8787
char* output, size_t* output_pos) {
88-
assert(input != nullptr && output != nullptr && output_pos != nullptr);
88+
assert(output != nullptr && output_pos != nullptr);
8989
*output_pos = 0;
9090
// Don't need to uncompress an empty input
9191
if (input_size == 0) {
9292
return 0;
9393
}
9494
#ifdef ZSTD_STREAMING
95-
if (input_buffer_.src != input) {
95+
if (input) {
9696
// New input
9797
input_buffer_ = {input, input_size, /*pos=*/0};
9898
}

util/compression.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,8 +1711,11 @@ class StreamingUncompress {
17111711
compress_format_version_(compress_format_version),
17121712
max_output_len_(max_output_len) {}
17131713
virtual ~StreamingUncompress() = default;
1714-
// uncompress should be called again with the same input if output_size is
1715-
// equal to max_output_len or with the next input fragment.
1714+
// Uncompress can be called repeatedly to progressively process the same
1715+
// input buffer, or can be called with a new input buffer. When the input
1716+
// buffer is not fully consumed, the return value is > 0 or output_size
1717+
// == max_output_len. When calling uncompress to continue processing the
1718+
// same input buffer, the input argument should be nullptr.
17161719
// Parameters:
17171720
// input - buffer to uncompress
17181721
// input_size - size of input buffer

0 commit comments

Comments
 (0)