Skip to content

Commit d88100c

Browse files
committed
Add Compressor interface
1 parent bcbed0d commit d88100c

File tree

89 files changed

+3435
-2069
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+3435
-2069
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,7 @@ set(SOURCES
891891
util/comparator.cc
892892
util/compression.cc
893893
util/compression_context_cache.cc
894+
util/compressor.cc
894895
util/concurrent_task_limiter_impl.cc
895896
util/crc32c.cc
896897
util/data_structure.cc
@@ -1493,6 +1494,7 @@ if(WITH_TESTS)
14931494
util/autovector_test.cc
14941495
util/bloom_test.cc
14951496
util/coding_test.cc
1497+
util/compression_test.cc
14961498
util/crc32c_test.cc
14971499
util/defer_test.cc
14981500
util/dynamic_bloom_test.cc

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
12841284
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
12851285
$(AM_LINK)
12861286

1287+
compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
1288+
$(AM_LINK)
1289+
12871290
hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
12881291
$(AM_LINK)
12891292

TARGETS

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
252252
"util/comparator.cc",
253253
"util/compression.cc",
254254
"util/compression_context_cache.cc",
255+
"util/compressor.cc",
255256
"util/concurrent_task_limiter_impl.cc",
256257
"util/crc32c.cc",
257258
"util/crc32c_arm64.cc",
@@ -4667,6 +4668,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
46674668
extra_compiler_flags=[])
46684669

46694670

4671+
cpp_unittest_wrapper(name="compression_test",
4672+
srcs=["util/compression_test.cc"],
4673+
deps=[":rocksdb_test_lib"],
4674+
extra_compiler_flags=[])
4675+
4676+
46704677
cpp_unittest_wrapper(name="configurable_test",
46714678
srcs=["options/configurable_test.cc"],
46724679
deps=[":rocksdb_test_lib"],

cache/compressed_secondary_cache.cc

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
7373
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
7474
create_context, allocator, &value, &charge);
7575
} else {
76-
UncompressionContext uncompression_context(cache_options_.compression_type);
77-
UncompressionInfo uncompression_info(uncompression_context,
78-
UncompressionDict::GetEmptyDict(),
79-
cache_options_.compression_type);
76+
auto compressor =
77+
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
78+
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(),
79+
cache_options_.compress_format_version,
80+
allocator);
8081

8182
size_t uncompressed_size{0};
82-
CacheAllocationPtr uncompressed = UncompressData(
83-
uncompression_info, (char*)ptr->get(), handle_value_charge,
84-
&uncompressed_size, cache_options_.compress_format_version, allocator);
83+
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
84+
compressor.get(), (char*)ptr->get(), handle_value_charge,
85+
&uncompressed_size);
8586

8687
if (!uncompressed) {
8788
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
@@ -151,17 +152,15 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
151152
if (cache_options_.compression_type != kNoCompression &&
152153
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
153154
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
154-
CompressionOptions compression_opts;
155-
CompressionContext compression_context(cache_options_.compression_type,
156-
compression_opts);
155+
auto compressor =
156+
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
157157
uint64_t sample_for_compression{0};
158-
CompressionInfo compression_info(
159-
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
160-
cache_options_.compression_type, sample_for_compression);
158+
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
159+
cache_options_.compress_format_version,
160+
sample_for_compression);
161161

162162
bool success =
163-
CompressData(val, compression_info,
164-
cache_options_.compress_format_version, &compressed_val);
163+
compression_info.CompressData(compressor.get(), val, &compressed_val);
165164

166165
if (!success) {
167166
return Status::Corruption("Error compressing value.");

db/arena_wrapped_db_iter.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "rocksdb/options.h"
1616
#include "table/internal_iterator.h"
1717
#include "table/iterator_wrapper.h"
18+
#include "util/string_util.h"
1819
#include "util/user_comparator_wrapper.h"
1920

2021
namespace ROCKSDB_NAMESPACE {

db/blob/blob_file_builder.cc

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
6666
immutable_options_(immutable_options),
6767
min_blob_size_(mutable_cf_options->min_blob_size),
6868
blob_file_size_(mutable_cf_options->blob_file_size),
69-
blob_compression_type_(mutable_cf_options->blob_compression_type),
69+
blob_compressor_(mutable_cf_options->blob_compressor),
7070
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
7171
file_options_(file_options),
7272
db_id_(std::move(db_id)),
@@ -91,6 +91,10 @@ BlobFileBuilder::BlobFileBuilder(
9191
assert(blob_file_paths_->empty());
9292
assert(blob_file_additions_);
9393
assert(blob_file_additions_->empty());
94+
95+
if (blob_compressor_ == nullptr) {
96+
blob_compressor_ = BuiltinCompressor::GetCompressor(kNoCompression);
97+
}
9498
}
9599

96100
BlobFileBuilder::~BlobFileBuilder() = default;
@@ -150,7 +154,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
150154
}
151155

152156
BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
153-
blob_compression_type_);
157+
blob_compressor_->GetCompressionType());
154158

155159
return Status::OK();
156160
}
@@ -227,7 +231,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
227231
constexpr bool has_ttl = false;
228232
constexpr ExpirationRange expiration_range;
229233

230-
BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
234+
BlobLogHeader header(column_family_id_,
235+
blob_compressor_->GetCompressionType(), has_ttl,
231236
expiration_range);
232237

233238
{
@@ -255,27 +260,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
255260
assert(compressed_blob->empty());
256261
assert(immutable_options_);
257262

258-
if (blob_compression_type_ == kNoCompression) {
263+
if (blob_compressor_->GetCompressionType() == kNoCompression) {
259264
return Status::OK();
260265
}
261266

262-
// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
263-
CompressionOptions opts;
264-
CompressionContext context(blob_compression_type_, opts);
265-
constexpr uint64_t sample_for_compression = 0;
266-
267-
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
268-
blob_compression_type_, sample_for_compression);
269-
270-
constexpr uint32_t compression_format_version = 2;
267+
CompressionInfo info;
271268

272269
bool success = false;
273270

274271
{
275272
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
276273
BLOB_DB_COMPRESSION_MICROS);
277-
success =
278-
CompressData(*blob, info, compression_format_version, compressed_blob);
274+
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
279275
}
280276

281277
if (!success) {

db/blob/blob_file_builder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "rocksdb/env.h"
1616
#include "rocksdb/rocksdb_namespace.h"
1717
#include "rocksdb/types.h"
18+
#include "util/compressor.h"
1819

1920
namespace ROCKSDB_NAMESPACE {
2021

@@ -89,7 +90,7 @@ class BlobFileBuilder {
8990
const ImmutableOptions* immutable_options_;
9091
uint64_t min_blob_size_;
9192
uint64_t blob_file_size_;
92-
CompressionType blob_compression_type_;
93+
std::shared_ptr<Compressor> blob_compressor_;
9394
PrepopulateBlobCache prepopulate_blob_cache_;
9495
const FileOptions* file_options_;
9596
const std::string db_id_;

db/blob/blob_file_builder_test.cc

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -405,16 +405,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
405405
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
406406
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
407407

408-
CompressionOptions opts;
409-
CompressionContext context(kSnappyCompression, opts);
410-
constexpr uint64_t sample_for_compression = 0;
411-
412-
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
413-
kSnappyCompression, sample_for_compression);
408+
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
409+
ASSERT_NE(compressor, nullptr);
414410

415411
std::string compressed_value;
416-
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
417-
uncompressed_value.size(), &compressed_value));
412+
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
413+
&compressed_value));
418414

419415
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
420416
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());

db/blob/blob_file_reader.cc

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,11 @@ Status BlobFileReader::Create(
5050

5151
Statistics* const statistics = immutable_options.stats;
5252

53-
CompressionType compression_type = kNoCompression;
53+
std::shared_ptr<Compressor> compressor;
5454

5555
{
56-
const Status s =
57-
ReadHeader(file_reader.get(), read_options, column_family_id,
58-
statistics, &compression_type);
56+
const Status s = ReadHeader(file_reader.get(), read_options,
57+
column_family_id, statistics, &compressor);
5958
if (!s.ok()) {
6059
return s;
6160
}
@@ -70,7 +69,7 @@ Status BlobFileReader::Create(
7069
}
7170

7271
blob_file_reader->reset(
73-
new BlobFileReader(std::move(file_reader), file_size, compression_type,
72+
new BlobFileReader(std::move(file_reader), file_size, compressor,
7473
immutable_options.clock, statistics));
7574

7675
return Status::OK();
@@ -140,9 +139,9 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
140139
const ReadOptions& read_options,
141140
uint32_t column_family_id,
142141
Statistics* statistics,
143-
CompressionType* compression_type) {
142+
std::shared_ptr<Compressor>* compressor) {
144143
assert(file_reader);
145-
assert(compression_type);
144+
assert(compressor);
146145

147146
Slice header_slice;
148147
Buffer buf;
@@ -184,7 +183,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
184183
return Status::Corruption("Column family ID mismatch");
185184
}
186185

187-
*compression_type = header.compression;
186+
*compressor = BuiltinCompressor::GetCompressor(header.compression);
188187

189188
return Status::OK();
190189
}
@@ -281,11 +280,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
281280

282281
BlobFileReader::BlobFileReader(
283282
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
284-
CompressionType compression_type, SystemClock* clock,
283+
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
285284
Statistics* statistics)
286285
: file_reader_(std::move(file_reader)),
287286
file_size_(file_size),
288-
compression_type_(compression_type),
287+
compressor_(compressor),
289288
clock_(clock),
290289
statistics_(statistics) {
291290
assert(file_reader_);
@@ -295,7 +294,7 @@ BlobFileReader::~BlobFileReader() = default;
295294

296295
Status BlobFileReader::GetBlob(
297296
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
298-
uint64_t value_size, CompressionType compression_type,
297+
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
299298
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
300299
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
301300
assert(result);
@@ -306,7 +305,7 @@ Status BlobFileReader::GetBlob(
306305
return Status::Corruption("Invalid blob offset");
307306
}
308307

309-
if (compression_type != compression_type_) {
308+
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
310309
return Status::Corruption("Compression type mismatch when reading blob");
311310
}
312311

@@ -374,7 +373,7 @@ Status BlobFileReader::GetBlob(
374373

375374
{
376375
const Status s = UncompressBlobIfNeeded(
377-
value_slice, compression_type, allocator, clock_, statistics_, result);
376+
value_slice, compressor.get(), allocator, clock_, statistics_, result);
378377
if (!s.ok()) {
379378
return s;
380379
}
@@ -420,7 +419,8 @@ void BlobFileReader::MultiGetBlob(
420419
*req->status = Status::Corruption("Invalid blob offset");
421420
continue;
422421
}
423-
if (req->compression != compression_type_) {
422+
if (req->compressor->GetCompressionType() !=
423+
compressor_->GetCompressionType()) {
424424
*req->status =
425425
Status::Corruption("Compression type mismatch when reading a blob");
426426
continue;
@@ -522,7 +522,7 @@ void BlobFileReader::MultiGetBlob(
522522
// Uncompress blob if needed
523523
Slice value_slice(record_slice.data() + adjustments[i], req->len);
524524
*req->status =
525-
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
525+
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
526526
clock_, statistics_, &blob_reqs[i].second);
527527
if (req->status->ok()) {
528528
total_bytes += record_slice.size();
@@ -579,31 +579,28 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
579579
}
580580

581581
Status BlobFileReader::UncompressBlobIfNeeded(
582-
const Slice& value_slice, CompressionType compression_type,
582+
const Slice& value_slice, Compressor* compressor,
583583
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
584584
std::unique_ptr<BlobContents>* result) {
585+
assert(compressor);
585586
assert(result);
586587

587-
if (compression_type == kNoCompression) {
588+
if (compressor->GetCompressionType() == kNoCompression) {
588589
BlobContentsCreator::Create(result, nullptr, value_slice, allocator);
589590
return Status::OK();
590591
}
591592

592-
UncompressionContext context(compression_type);
593-
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
594-
compression_type);
593+
UncompressionInfo info;
595594

596595
size_t uncompressed_size = 0;
597-
constexpr uint32_t compression_format_version = 2;
598596

599597
CacheAllocationPtr output;
600598

601599
{
602600
PERF_TIMER_GUARD(blob_decompress_time);
603601
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
604-
output = UncompressData(info, value_slice.data(), value_slice.size(),
605-
&uncompressed_size, compression_format_version,
606-
allocator);
602+
output = info.UncompressData(compressor, value_slice.data(),
603+
value_slice.size(), &uncompressed_size);
607604
}
608605

609606
TEST_SYNC_POINT_CALLBACK(

0 commit comments

Comments
 (0)