diff --git a/db/builder.cc b/db/builder.cc index 4a07aeeb2096c6dc1b38c0d1f8714592019eb9f6..1fc200930073bd5c1452b0ccbe77f3d8e8e8929e 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -32,7 +32,7 @@ Status BuildTable(const std::string& dbname, return s; } - TableBuilder* builder = new TableBuilder(options, file); + TableBuilder* builder = new TableBuilder(options, file, 0); meta->smallest.DecodeFrom(iter->key()); for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); diff --git a/db/db_bench.cc b/db/db_bench.cc index d298ac29e9a1035ead94fd29d304f2b2b830f4c9..22397a2fcebe2b4954f0f232f86d8ad44c17e748 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -179,6 +179,10 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0; static enum leveldb::CompressionType FLAGS_compression_type = leveldb::kSnappyCompression; +// Allows compression for levels 0 and 1 to be disabled when +// other levels are compressed +static int FLAGS_min_level_to_compress = -1; + // posix or hdfs environment static leveldb::Env* FLAGS_env = leveldb::Env::Default(); @@ -913,6 +917,17 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type; + if (FLAGS_min_level_to_compress >= 0) { + assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); + options.compression_per_level = new CompressionType[FLAGS_num_levels]; + for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (unsigned int i = FLAGS_min_level_to_compress; + i < FLAGS_num_levels; i++) { + options.compression_per_level[i] = FLAGS_compression_type; + } + } options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; @@ -922,6 +937,9 @@ class Benchmark { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); } + if (FLAGS_min_level_to_compress >= 0) { + delete options.compression_per_level; + } } void WriteSeq(ThreadState* thread) { @@ -1321,6 +1339,9 @@ int main(int argc, char** argv) { else { fprintf(stdout, "Cannot parse %s\n", argv[i]); } + } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 + && n >= 0) { + FLAGS_min_level_to_compress = n; } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_disable_seek_compaction = n; diff --git a/db/db_impl.cc b/db/db_impl.cc index f179e77dc30d8701cfc0565a6c81a9247e80d38a..211010990797961edaac85b5b61377c1c40e7cdd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -156,6 +156,12 @@ Options SanitizeOptions(const std::string& dbname, if (result.block_cache == NULL) { result.block_cache = NewLRUCache(8 << 20); } + if (src.compression_per_level != NULL) { + result.compression_per_level = new CompressionType[src.num_levels]; + for (unsigned int i = 0; i < src.num_levels; i++) { + result.compression_per_level[i] = src.compression_per_level[i]; + } + } return result; } @@ -246,6 +252,9 @@ DBImpl::~DBImpl() { if (owns_cache_) { delete options_.block_cache; } + if (options_.compression_per_level != NULL) { + delete options_.compression_per_level; + } delete logger_; } @@ -961,7 +970,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { std::string fname = TableFileName(dbname_, file_number); Status s = env_->NewWritableFile(fname, &compact->outfile); if (s.ok()) { - compact->builder = new TableBuilder(options_, compact->outfile); + compact->builder = new TableBuilder(options_, compact->outfile, + compact->compaction->level() + 1); } return s; } diff --git a/db/db_test.cc b/db/db_test.cc index daea3aaaf15e8bb507a87ec414dbccb15f63f66e..e5a8f87118e59f20f2185b6a0d9248c72e7a7e61 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,6 +20,24 @@ namespace leveldb { +static bool SnappyCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(in.data(), in.size(), &out); +} + +static bool ZlibCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Zlib_Compress(in.data(), in.size(), &out); +} + +static bool BZip2CompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::BZip2_Compress(in.data(), in.size(), &out); +} + static std::string RandomString(Random* rnd, int len) { std::string r; test::RandomString(rnd, len, &r); @@ -1058,6 +1076,80 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } +void MinLevelHelper(DBTest* self, Options& options) { + Random rnd(301); + + for (int num = 0; + num < options.level0_file_num_compaction_trigger - 1; + num++) + { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompactMemTable(); + ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); + ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); +} + +TEST(DBTest, MinLevelToCompress) { + Options options = CurrentOptions(); + options.write_buffer_size = 100<<10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; + options.create_if_missing = true; + CompressionType type; + + if (SnappyCompressionSupported()) { + type = kSnappyCompression; + fprintf(stderr, "using snappy\n"); + } else if (ZlibCompressionSupported()) { + type = kZlibCompression; + fprintf(stderr, "using zlib\n"); + } else if (BZip2CompressionSupported()) { + type = kBZip2Compression; + fprintf(stderr, "using bzip2\n"); + } else { + fprintf(stderr, "skipping test, compression disabled\n"); + return; + } + options.compression_per_level = new CompressionType[options.num_levels]; + + // do not compress L0 + for (int i = 0; i < 1; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 1; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + Reopen(&options); + MinLevelHelper(this, options); + + // do not compress L0 and L1 + for (int i = 0; i < 2; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 2; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} + TEST(DBTest, RepeatedWritesToSameKey) { Options options = CurrentOptions(); options.env = env_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 3b94d829f86a01368e0ea32454474df132a1988f..c75e725f59bab2f71cc1a454d7d7b8f2896c44e2 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -134,6 +134,20 @@ struct Options { // efficiently detect that and will switch to uncompressed mode. CompressionType compression; + // Different levels can have different compression policies. There + // are cases where most lower levels would like to quick compression + // algorithm while the higher levels (which have more data) use + // compression algorithms that have better compression but could + // be slower. This array, if non NULL, should have an entry for + // each level of the database. This array, if non NULL, overides the + // value specified in the previous field 'compression'. The caller is + // reponsible for allocating memory and initializing the values in it + // before invoking Open(). The caller is responsible for freeing this + // array and it could be freed anytime after the return from Open(). + // This could have been a std::vector but that makes the equivalent + // java/C api hard to construct. + CompressionType* compression_per_level; + // If non-NULL, use the specified filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. diff --git a/include/leveldb/table_builder.h b/include/leveldb/table_builder.h index 5fd1dc71f1cb7541ef62397b6795946ad8c20652..5c755137e75a72cd299bf11ca9e1311c2e517ca5 100644 --- a/include/leveldb/table_builder.h +++ b/include/leveldb/table_builder.h @@ -27,8 +27,10 @@ class TableBuilder { public: // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the - // caller to close the file after calling Finish(). - TableBuilder(const Options& options, WritableFile* file); + // caller to close the file after calling Finish(). The output file + // will be part of level specified by 'level'. A value of -1 means + // that the caller does not know which level the output file will reside. + TableBuilder(const Options& options, WritableFile* file, int level=-1); // REQUIRES: Either Finish() or Abandon() has been called. ~TableBuilder(); @@ -81,6 +83,7 @@ class TableBuilder { struct Rep; Rep* rep_; + int level_; // No copying allowed TableBuilder(const TableBuilder&); diff --git a/table/table_builder.cc b/table/table_builder.cc index eabdedeada10530920984b5e3cddd43457e6ac72..5306b2976c20c64619a704b403e6b2c55998d658 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -60,8 +60,9 @@ struct TableBuilder::Rep { } }; -TableBuilder::TableBuilder(const Options& options, WritableFile* file) - : rep_(new Rep(options, file)) { +TableBuilder::TableBuilder(const Options& options, WritableFile* file, + int level) + : rep_(new Rep(options, file)), level_(level) { if (rep_->filter_block != NULL) { rep_->filter_block->StartBlock(0); } @@ -152,7 +153,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; std::string* compressed = &r->compressed_output; - CompressionType type = r->options.compression; + CompressionType type; + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (r->options.compression_per_level != NULL) { + if (level_ == -1) { + // this is mostly for backward compatibility. The builder does not + // know which level this file belongs to. Apply the compression level + // specified for level 0 to all levels. + type = r->options.compression_per_level[0]; + } else { + type = r->options.compression_per_level[level_]; + } + } else { + type = r->options.compression; + } switch (type) { case kNoCompression: block_contents = raw; diff --git a/util/options.cc b/util/options.cc index eb132877ef8c7309b58ea2e5d9ef0c296a07cf01..40933c54db359528a35903393b38a4a57eab281c 100644 --- a/util/options.cc +++ b/util/options.cc @@ -24,6 +24,7 @@ Options::Options() block_size(4096), block_restart_interval(16), compression(kSnappyCompression), + compression_per_level(NULL), filter_policy(NULL), num_levels(7), level0_file_num_compaction_trigger(4), @@ -63,7 +64,14 @@ Options::Dump( Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); Log(log," Options.block_size: %zd", block_size); Log(log,"Options.block_restart_interval: %d", block_restart_interval); - Log(log," Options.compression: %d", compression); + if (compression_per_level != NULL) { + for (unsigned int i = 0; i < num_levels; i++){ + Log(log," Options.compression[%d]: %d", + i, compression_per_level[i]); + } + } else { + Log(log," Options.compression: %d", compression); + } Log(log," Options.filter_policy: %s", filter_policy == NULL ? "NULL" : filter_policy->Name()); Log(log," Options.num_levels: %d", num_levels);