Skip to content

Commit 8f4a3e6

Browse files
committed
Add compressor interface
1 parent 77b61ab commit 8f4a3e6

File tree

94 files changed

+3316
-1958
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+3316
-1958
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,7 @@ set(SOURCES
877877
util/comparator.cc
878878
util/compression.cc
879879
util/compression_context_cache.cc
880+
util/compressor.cc
880881
util/concurrent_task_limiter_impl.cc
881882
util/crc32c.cc
882883
util/dynamic_bloom.cc
@@ -1435,6 +1436,7 @@ if(WITH_TESTS)
14351436
util/autovector_test.cc
14361437
util/bloom_test.cc
14371438
util/coding_test.cc
1439+
util/compression_test.cc
14381440
util/crc32c_test.cc
14391441
util/defer_test.cc
14401442
util/dynamic_bloom_test.cc

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
13751375
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
13761376
$(AM_LINK)
13771377

1378+
compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
1379+
$(AM_LINK)
1380+
13781381
hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
13791382
$(AM_LINK)
13801383

TARGETS

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
244244
"util/comparator.cc",
245245
"util/compression.cc",
246246
"util/compression_context_cache.cc",
247+
"util/compressor.cc",
247248
"util/concurrent_task_limiter_impl.cc",
248249
"util/crc32c.cc",
249250
"util/crc32c_arm64.cc",
@@ -586,6 +587,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
586587
"util/comparator.cc",
587588
"util/compression.cc",
588589
"util/compression_context_cache.cc",
590+
"util/compressor.cc",
589591
"util/concurrent_task_limiter_impl.cc",
590592
"util/crc32c.cc",
591593
"util/crc32c_arm64.cc",
@@ -4994,6 +4996,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
49944996
extra_compiler_flags=[])
49954997

49964998

4999+
cpp_unittest_wrapper(name="compression_test",
5000+
srcs=["util/compression_test.cc"],
5001+
deps=[":rocksdb_test_lib"],
5002+
extra_compiler_flags=[])
5003+
5004+
49975005
cpp_unittest_wrapper(name="configurable_test",
49985006
srcs=["options/configurable_test.cc"],
49995007
deps=[":rocksdb_test_lib"],

cache/compressed_secondary_cache.cc

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
7575
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
7676
create_context, allocator, &value, &charge);
7777
} else {
78-
UncompressionContext uncompression_context(cache_options_.compression_type);
79-
UncompressionInfo uncompression_info(uncompression_context,
80-
UncompressionDict::GetEmptyDict(),
81-
cache_options_.compression_type);
78+
auto compressor =
79+
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
80+
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(),
81+
cache_options_.compress_format_version,
82+
allocator);
8283

8384
size_t uncompressed_size{0};
84-
CacheAllocationPtr uncompressed = UncompressData(
85-
uncompression_info, (char*)ptr->get(), handle_value_charge,
86-
&uncompressed_size, cache_options_.compress_format_version, allocator);
85+
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
86+
compressor.get(), (char*)ptr->get(), handle_value_charge,
87+
&uncompressed_size);
8788

8889
if (!uncompressed) {
8990
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
@@ -145,16 +146,15 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
145146
std::string compressed_val;
146147
if (cache_options_.compression_type != kNoCompression) {
147148
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
148-
CompressionOptions compression_opts;
149-
CompressionContext compression_context(cache_options_.compression_type);
149+
auto compressor =
150+
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
150151
uint64_t sample_for_compression{0};
151-
CompressionInfo compression_info(
152-
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
153-
cache_options_.compression_type, sample_for_compression);
152+
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
153+
cache_options_.compress_format_version,
154+
sample_for_compression);
154155

155156
bool success =
156-
CompressData(val, compression_info,
157-
cache_options_.compress_format_version, &compressed_val);
157+
compression_info.CompressData(compressor.get(), val, &compressed_val);
158158

159159
if (!success) {
160160
return Status::Corruption("Error compressing value.");

db/arena_wrapped_db_iter.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "rocksdb/options.h"
1616
#include "table/internal_iterator.h"
1717
#include "table/iterator_wrapper.h"
18+
#include "util/string_util.h"
1819
#include "util/user_comparator_wrapper.h"
1920

2021
namespace ROCKSDB_NAMESPACE {

db/blob/blob_file_builder.cc

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
6666
immutable_options_(immutable_options),
6767
min_blob_size_(mutable_cf_options->min_blob_size),
6868
blob_file_size_(mutable_cf_options->blob_file_size),
69-
blob_compression_type_(mutable_cf_options->blob_compression_type),
69+
blob_compressor_(mutable_cf_options->blob_compressor),
7070
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
7171
file_options_(file_options),
7272
db_id_(std::move(db_id)),
@@ -150,7 +150,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
150150
}
151151

152152
BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
153-
blob_compression_type_);
153+
blob_compressor_->GetCompressionType());
154154

155155
return Status::OK();
156156
}
@@ -227,7 +227,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
227227
constexpr bool has_ttl = false;
228228
constexpr ExpirationRange expiration_range;
229229

230-
BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
230+
BlobLogHeader header(column_family_id_,
231+
blob_compressor_->GetCompressionType(), has_ttl,
231232
expiration_range);
232233

233234
{
@@ -255,26 +256,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
255256
assert(compressed_blob->empty());
256257
assert(immutable_options_);
257258

258-
if (blob_compression_type_ == kNoCompression) {
259+
if (blob_compressor_->GetCompressionType() == kNoCompression) {
259260
return Status::OK();
260261
}
261262

262-
CompressionOptions opts;
263-
CompressionContext context(blob_compression_type_);
264-
constexpr uint64_t sample_for_compression = 0;
265-
266-
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
267-
blob_compression_type_, sample_for_compression);
268-
269-
constexpr uint32_t compression_format_version = 2;
263+
CompressionInfo info;
270264

271265
bool success = false;
272266

273267
{
274268
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
275269
BLOB_DB_COMPRESSION_MICROS);
276-
success =
277-
CompressData(*blob, info, compression_format_version, compressed_blob);
270+
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
278271
}
279272

280273
if (!success) {

db/blob/blob_file_builder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "rocksdb/env.h"
1616
#include "rocksdb/rocksdb_namespace.h"
1717
#include "rocksdb/types.h"
18+
#include "util/compressor.h"
1819

1920
namespace ROCKSDB_NAMESPACE {
2021

@@ -89,7 +90,7 @@ class BlobFileBuilder {
8990
const ImmutableOptions* immutable_options_;
9091
uint64_t min_blob_size_;
9192
uint64_t blob_file_size_;
92-
CompressionType blob_compression_type_;
93+
std::shared_ptr<Compressor> blob_compressor_;
9394
PrepopulateBlobCache prepopulate_blob_cache_;
9495
const FileOptions* file_options_;
9596
const std::string db_id_;

db/blob/blob_file_builder_test.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,15 +406,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
406406
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
407407

408408
CompressionOptions opts;
409-
CompressionContext context(kSnappyCompression);
410-
constexpr uint64_t sample_for_compression = 0;
411-
412-
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
413-
kSnappyCompression, sample_for_compression);
409+
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
410+
ASSERT_NE(compressor, nullptr);
414411

415412
std::string compressed_value;
416-
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
417-
uncompressed_value.size(), &compressed_value));
413+
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
414+
&compressed_value));
418415

419416
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
420417
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());

db/blob/blob_file_reader.cc

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ Status BlobFileReader::Create(
4949

5050
Statistics* const statistics = immutable_options.stats;
5151

52-
CompressionType compression_type = kNoCompression;
52+
std::shared_ptr<Compressor> compressor;
5353

5454
{
5555
const Status s = ReadHeader(file_reader.get(), column_family_id, statistics,
56-
&compression_type);
56+
&compressor);
5757
if (!s.ok()) {
5858
return s;
5959
}
@@ -67,7 +67,7 @@ Status BlobFileReader::Create(
6767
}
6868

6969
blob_file_reader->reset(
70-
new BlobFileReader(std::move(file_reader), file_size, compression_type,
70+
new BlobFileReader(std::move(file_reader), file_size, compressor,
7171
immutable_options.clock, statistics));
7272

7373
return Status::OK();
@@ -136,9 +136,9 @@ Status BlobFileReader::OpenFile(
136136
Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
137137
uint32_t column_family_id,
138138
Statistics* statistics,
139-
CompressionType* compression_type) {
139+
std::shared_ptr<Compressor>* compressor) {
140140
assert(file_reader);
141-
assert(compression_type);
141+
assert(compressor);
142142

143143
Slice header_slice;
144144
Buffer buf;
@@ -181,7 +181,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
181181
return Status::Corruption("Column family ID mismatch");
182182
}
183183

184-
*compression_type = header.compression;
184+
*compressor = BuiltinCompressor::GetCompressor(header.compression);
185185

186186
return Status::OK();
187187
}
@@ -272,11 +272,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
272272

273273
BlobFileReader::BlobFileReader(
274274
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
275-
CompressionType compression_type, SystemClock* clock,
275+
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
276276
Statistics* statistics)
277277
: file_reader_(std::move(file_reader)),
278278
file_size_(file_size),
279-
compression_type_(compression_type),
279+
compressor_(compressor),
280280
clock_(clock),
281281
statistics_(statistics) {
282282
assert(file_reader_);
@@ -286,7 +286,7 @@ BlobFileReader::~BlobFileReader() = default;
286286

287287
Status BlobFileReader::GetBlob(
288288
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
289-
uint64_t value_size, CompressionType compression_type,
289+
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
290290
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
291291
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
292292
assert(result);
@@ -297,7 +297,7 @@ Status BlobFileReader::GetBlob(
297297
return Status::Corruption("Invalid blob offset");
298298
}
299299

300-
if (compression_type != compression_type_) {
300+
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
301301
return Status::Corruption("Compression type mismatch when reading blob");
302302
}
303303

@@ -361,7 +361,7 @@ Status BlobFileReader::GetBlob(
361361

362362
{
363363
const Status s = UncompressBlobIfNeeded(
364-
value_slice, compression_type, allocator, clock_, statistics_, result);
364+
value_slice, compressor.get(), allocator, clock_, statistics_, result);
365365
if (!s.ok()) {
366366
return s;
367367
}
@@ -407,7 +407,8 @@ void BlobFileReader::MultiGetBlob(
407407
*req->status = Status::Corruption("Invalid blob offset");
408408
continue;
409409
}
410-
if (req->compression != compression_type_) {
410+
if (req->compressor->GetCompressionType() !=
411+
compressor_->GetCompressionType()) {
411412
*req->status =
412413
Status::Corruption("Compression type mismatch when reading a blob");
413414
continue;
@@ -506,7 +507,7 @@ void BlobFileReader::MultiGetBlob(
506507
// Uncompress blob if needed
507508
Slice value_slice(record_slice.data() + adjustments[i], req->len);
508509
*req->status =
509-
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
510+
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
510511
clock_, statistics_, &blob_reqs[i].second);
511512
if (req->status->ok()) {
512513
total_bytes += record_slice.size();
@@ -563,31 +564,28 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
563564
}
564565

565566
Status BlobFileReader::UncompressBlobIfNeeded(
566-
const Slice& value_slice, CompressionType compression_type,
567+
const Slice& value_slice, Compressor* compressor,
567568
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
568569
std::unique_ptr<BlobContents>* result) {
570+
assert(compressor);
569571
assert(result);
570572

571-
if (compression_type == kNoCompression) {
573+
if (compressor->GetCompressionType() == kNoCompression) {
572574
BlobContentsCreator::Create(result, nullptr, value_slice, allocator);
573575
return Status::OK();
574576
}
575577

576-
UncompressionContext context(compression_type);
577-
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
578-
compression_type);
578+
UncompressionInfo info;
579579

580580
size_t uncompressed_size = 0;
581-
constexpr uint32_t compression_format_version = 2;
582581

583582
CacheAllocationPtr output;
584583

585584
{
586585
PERF_TIMER_GUARD(blob_decompress_time);
587586
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
588-
output = UncompressData(info, value_slice.data(), value_slice.size(),
589-
&uncompressed_size, compression_format_version,
590-
allocator);
587+
output = info.UncompressData(compressor, value_slice.data(),
588+
value_slice.size(), &uncompressed_size);
591589
}
592590

593591
TEST_SYNC_POINT_CALLBACK(

0 commit comments

Comments
 (0)