version_edit.cc 19.0 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/version_edit.h"

#include "db/version_set.h"
13
#include "rocksdb/slice.h"
J
jorlow@chromium.org 已提交
14
#include "util/coding.h"
15
#include "util/event_logger.h"
16
#include "util/string_util.h"
17
#include "util/sync_point.h"
J
jorlow@chromium.org 已提交
18

19
namespace rocksdb {
J
jorlow@chromium.org 已提交
20 21 22 23

// Tag numbers for serialized VersionEdit.  These numbers are written to
// disk and should not be changed.
enum Tag {
24 25 26 27 28 29 30
  kComparator = 1,
  kLogNumber = 2,
  kNextFileNumber = 3,
  kLastSequence = 4,
  kCompactPointer = 5,
  kDeletedFile = 6,
  kNewFile = 7,
D
dgrogan@chromium.org 已提交
31
  // 8 was used for large value refs
32
  kPrevLogNumber = 9,
33
  kDeletedLogNumber = 10,
34 35

  // these are new formats divergent from open source leveldb
36 37
  kNewFile2 = 100,
  kNewFile3 = 102,
38
  kNewFile4 = 103,      // 4th (the latest) format version of adding files
39 40 41 42
  kColumnFamily = 200,  // specify column family for version edit
  kColumnFamilyAdd = 201,
  kColumnFamilyDrop = 202,
  kMaxColumnFamily = 203,
J
jorlow@chromium.org 已提交
43 44
};

45 46 47
enum CustomTag {
  kTerminate = 1,  // The end of customized fields
  kNeedCompaction = 2,
48 49 50 51 52
  // Since Manifest is not entirely currently forward-compatible, and the only
  // forward-compatbile part is the CutsomtTag of kNewFile, we currently encode
  // kDeletedLogNumber as part of a CustomTag as a hack. This should be removed
  // when manifest becomes forward-comptabile.
  kDeletedLogNumberHack = 3,
53 54 55 56 57 58
  kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
// we don't know this field.
uint32_t kCustomTagNonSafeIgnoreMask = 1 << 6;

59 60 61 62 63
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
  assert(number <= kFileNumberMask);
  return number | (path_id * (kFileNumberMask + 1));
}

J
jorlow@chromium.org 已提交
64 65
void VersionEdit::Clear() {
  comparator_.clear();
66
  max_level_ = 0;
J
jorlow@chromium.org 已提交
67
  log_number_ = 0;
68
  prev_log_number_ = 0;
J
jorlow@chromium.org 已提交
69 70
  last_sequence_ = 0;
  next_file_number_ = 0;
71
  max_column_family_ = 0;
72
  deleted_log_number_ = 0;
J
jorlow@chromium.org 已提交
73 74
  has_comparator_ = false;
  has_log_number_ = false;
75
  has_prev_log_number_ = false;
J
jorlow@chromium.org 已提交
76 77
  has_next_file_number_ = false;
  has_last_sequence_ = false;
78
  has_max_column_family_ = false;
79
  has_deleted_log_number_ = false;
J
jorlow@chromium.org 已提交
80 81
  deleted_files_.clear();
  new_files_.clear();
82 83 84 85
  column_family_ = 0;
  is_column_family_add_ = 0;
  is_column_family_drop_ = 0;
  column_family_name_.clear();
J
jorlow@chromium.org 已提交
86 87
}

88
bool VersionEdit::EncodeTo(std::string* dst) const {
J
jorlow@chromium.org 已提交
89 90 91 92 93
  if (has_comparator_) {
    PutVarint32(dst, kComparator);
    PutLengthPrefixedSlice(dst, comparator_);
  }
  if (has_log_number_) {
94
    PutVarint32Varint64(dst, kLogNumber, log_number_);
J
jorlow@chromium.org 已提交
95
  }
96
  if (has_prev_log_number_) {
97
    PutVarint32Varint64(dst, kPrevLogNumber, prev_log_number_);
98
  }
J
jorlow@chromium.org 已提交
99
  if (has_next_file_number_) {
100
    PutVarint32Varint64(dst, kNextFileNumber, next_file_number_);
J
jorlow@chromium.org 已提交
101 102
  }
  if (has_last_sequence_) {
103
    PutVarint32Varint64(dst, kLastSequence, last_sequence_);
J
jorlow@chromium.org 已提交
104
  }
105
  if (has_max_column_family_) {
106
    PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
107
  }
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
  if (has_deleted_log_number_) {
    // TODO(myabandeh): uncomment me when manifest is forward-compatible
    // PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_);
    // Since currently manifest is not forward compatible we encode this entry
    // disguised as a kNewFile4 entry which has forward-compatible extensions.
    PutVarint32(dst, kNewFile4);
    PutVarint32Varint64(dst, 0u, 0ull); // level and number
    PutVarint64(dst, 0ull); // file size
    InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue);
    PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest
    PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest
    PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst
    PutVarint32(dst, CustomTag::kDeletedLogNumberHack);
    std::string buf;
    PutFixed64(&buf, deleted_log_number_);
    PutLengthPrefixedSlice(dst, Slice(buf));
    PutVarint32(dst, CustomTag::kTerminate);
  }
J
jorlow@chromium.org 已提交
126

K
kailiu 已提交
127
  for (const auto& deleted : deleted_files_) {
128 129
    PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
                                deleted.second /* file number */);
J
jorlow@chromium.org 已提交
130 131
  }

D
dgrogan@chromium.org 已提交
132
  for (size_t i = 0; i < new_files_.size(); i++) {
J
jorlow@chromium.org 已提交
133
    const FileMetaData& f = new_files_[i].second;
134 135 136
    if (!f.smallest.Valid() || !f.largest.Valid()) {
      return false;
    }
137 138 139 140 141
    bool has_customized_fields = false;
    if (f.marked_for_compaction) {
      PutVarint32(dst, kNewFile4);
      has_customized_fields = true;
    } else if (f.fd.GetPathId() == 0) {
142 143 144 145 146 147
      // Use older format to make sure user can roll back the build if they
      // don't config multiple DB paths.
      PutVarint32(dst, kNewFile2);
    } else {
      PutVarint32(dst, kNewFile3);
    }
148
    PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber());
149 150
    if (f.fd.GetPathId() != 0 && !has_customized_fields) {
      // kNewFile3
151 152
      PutVarint32(dst, f.fd.GetPathId());
    }
153
    PutVarint64(dst, f.fd.GetFileSize());
J
jorlow@chromium.org 已提交
154 155
    PutLengthPrefixedSlice(dst, f.smallest.Encode());
    PutLengthPrefixedSlice(dst, f.largest.Encode());
156
    PutVarint64Varint64(dst, f.smallest_seqno, f.largest_seqno);
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    if (has_customized_fields) {
      // Customized fields' format:
      // +-----------------------------+
      // | 1st field's tag (varint32)  |
      // +-----------------------------+
      // | 1st field's size (varint32) |
      // +-----------------------------+
      // |    bytes for 1st field      |
      // |  (based on size decoded)    |
      // +-----------------------------+
      // |                             |
      // |          ......             |
      // |                             |
      // +-----------------------------+
      // | last field's size (varint32)|
      // +-----------------------------+
      // |    bytes for last field     |
      // |  (based on size decoded)    |
      // +-----------------------------+
      // | terminating tag (varint32)  |
      // +-----------------------------+
      //
      // Customized encoding for fields:
      //   tag kPathId: 1 byte as path_id
      //   tag kNeedCompaction:
      //        now only can take one char value 1 indicating need-compaction
      //
      if (f.fd.GetPathId() != 0) {
        PutVarint32(dst, CustomTag::kPathId);
        char p = static_cast<char>(f.fd.GetPathId());
        PutLengthPrefixedSlice(dst, Slice(&p, 1));
      }
      if (f.marked_for_compaction) {
        PutVarint32(dst, CustomTag::kNeedCompaction);
        char p = static_cast<char>(1);
        PutLengthPrefixedSlice(dst, Slice(&p, 1));
      }
      TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
                               dst);

      PutVarint32(dst, CustomTag::kTerminate);
    }
J
jorlow@chromium.org 已提交
199
  }
200 201 202

  // 0 is default and does not need to be explicitly written
  if (column_family_ != 0) {
203
    PutVarint32Varint32(dst, kColumnFamily, column_family_);
204 205 206 207 208 209 210 211 212 213
  }

  if (is_column_family_add_) {
    PutVarint32(dst, kColumnFamilyAdd);
    PutLengthPrefixedSlice(dst, Slice(column_family_name_));
  }

  if (is_column_family_drop_) {
    PutVarint32(dst, kColumnFamilyDrop);
  }
214
  return true;
J
jorlow@chromium.org 已提交
215 216 217 218 219 220
}

static bool GetInternalKey(Slice* input, InternalKey* dst) {
  Slice str;
  if (GetLengthPrefixedSlice(input, &str)) {
    dst->DecodeFrom(str);
221
    return dst->Valid();
J
jorlow@chromium.org 已提交
222 223 224 225 226
  } else {
    return false;
  }
}

A
Andrew Kryczka 已提交
227
bool VersionEdit::GetLevel(Slice* input, int* level, const char** /*msg*/) {
J
jorlow@chromium.org 已提交
228
  uint32_t v;
229
  if (GetVarint32(input, &v)) {
J
jorlow@chromium.org 已提交
230
    *level = v;
231 232 233
    if (max_level_ < *level) {
      max_level_ = *level;
    }
J
jorlow@chromium.org 已提交
234 235 236 237 238 239
    return true;
  } else {
    return false;
  }
}

240 241 242 243 244 245 246
const char* VersionEdit::DecodeNewFile4From(Slice* input) {
  const char* msg = nullptr;
  int level;
  FileMetaData f;
  uint64_t number;
  uint32_t path_id = 0;
  uint64_t file_size;
247 248 249 250
  // Since this is the only forward-compatible part of the code, we hack new
  // extension into this record. When we do, we set this boolean to distinguish
  // the record from the normal NewFile records.
  bool this_is_not_a_new_file_record = false;
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
  if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
      GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
      GetInternalKey(input, &f.largest) &&
      GetVarint64(input, &f.smallest_seqno) &&
      GetVarint64(input, &f.largest_seqno)) {
    // See comments in VersionEdit::EncodeTo() for format of customized fields
    while (true) {
      uint32_t custom_tag;
      Slice field;
      if (!GetVarint32(input, &custom_tag)) {
        return "new-file4 custom field";
      }
      if (custom_tag == kTerminate) {
        break;
      }
      if (!GetLengthPrefixedSlice(input, &field)) {
        return "new-file4 custom field lenth prefixed slice error";
      }
      switch (custom_tag) {
        case kPathId:
          if (field.size() != 1) {
            return "path_id field wrong size";
          }
          path_id = field[0];
          if (path_id > 3) {
            return "path_id wrong vaue";
          }
          break;
        case kNeedCompaction:
          if (field.size() != 1) {
            return "need_compaction field wrong size";
          }
          f.marked_for_compaction = (field[0] == 1);
          break;
285 286 287 288 289 290 291 292 293
        case kDeletedLogNumberHack:
          // This is a hack to encode kDeletedLogNumber in a forward-compatbile
          // fashion.
          this_is_not_a_new_file_record = true;
          if (!GetFixed64(&field, &deleted_log_number_)) {
            return "deleted log number malformatted";
          }
          has_deleted_log_number_ = true;
          break;
294 295 296 297 298 299 300 301 302 303 304
        default:
          if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
            // Should not proceed if cannot understand it
            return "new-file4 custom field not supported";
          }
          break;
      }
    }
  } else {
    return "new-file4 entry";
  }
305 306 307 308
  if (this_is_not_a_new_file_record) {
    // Since this has nothing to do with NewFile, return immediately.
    return nullptr;
  }
309 310 311 312 313
  f.fd = FileDescriptor(number, path_id, file_size);
  new_files_.push_back(std::make_pair(level, f));
  return nullptr;
}

J
jorlow@chromium.org 已提交
314 315 316
Status VersionEdit::DecodeFrom(const Slice& src) {
  Clear();
  Slice input = src;
A
Abhishek Kona 已提交
317
  const char* msg = nullptr;
J
jorlow@chromium.org 已提交
318 319 320 321 322 323 324 325
  uint32_t tag;

  // Temporary storage for parsing
  int level;
  FileMetaData f;
  Slice str;
  InternalKey key;

A
Abhishek Kona 已提交
326
  while (msg == nullptr && GetVarint32(&input, &tag)) {
J
jorlow@chromium.org 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
    switch (tag) {
      case kComparator:
        if (GetLengthPrefixedSlice(&input, &str)) {
          comparator_ = str.ToString();
          has_comparator_ = true;
        } else {
          msg = "comparator name";
        }
        break;

      case kLogNumber:
        if (GetVarint64(&input, &log_number_)) {
          has_log_number_ = true;
        } else {
          msg = "log number";
        }
        break;

345 346 347 348 349 350 351 352
      case kPrevLogNumber:
        if (GetVarint64(&input, &prev_log_number_)) {
          has_prev_log_number_ = true;
        } else {
          msg = "previous log number";
        }
        break;

J
jorlow@chromium.org 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
      case kNextFileNumber:
        if (GetVarint64(&input, &next_file_number_)) {
          has_next_file_number_ = true;
        } else {
          msg = "next file number";
        }
        break;

      case kLastSequence:
        if (GetVarint64(&input, &last_sequence_)) {
          has_last_sequence_ = true;
        } else {
          msg = "last sequence number";
        }
        break;

369 370 371 372 373 374 375 376
      case kMaxColumnFamily:
        if (GetVarint32(&input, &max_column_family_)) {
          has_max_column_family_ = true;
        } else {
          msg = "max column family";
        }
        break;

377 378 379 380 381 382 383 384
      case kDeletedLogNumber:
        if (GetVarint64(&input, &deleted_log_number_)) {
          has_deleted_log_number_ = true;
        } else {
          msg = "deleted log number";
        }
        break;

J
jorlow@chromium.org 已提交
385
      case kCompactPointer:
386
        if (GetLevel(&input, &level, &msg) &&
J
jorlow@chromium.org 已提交
387
            GetInternalKey(&input, &key)) {
I
Igor Canadi 已提交
388 389 390
          // we don't use compact pointers anymore,
          // but we should not fail if they are still
          // in manifest
J
jorlow@chromium.org 已提交
391
        } else {
392 393 394
          if (!msg) {
            msg = "compaction pointer";
          }
J
jorlow@chromium.org 已提交
395 396 397
        }
        break;

I
Igor Canadi 已提交
398 399 400
      case kDeletedFile: {
        uint64_t number;
        if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number)) {
J
jorlow@chromium.org 已提交
401 402
          deleted_files_.insert(std::make_pair(level, number));
        } else {
403 404 405
          if (!msg) {
            msg = "deleted file";
          }
J
jorlow@chromium.org 已提交
406 407
        }
        break;
I
Igor Canadi 已提交
408
      }
J
jorlow@chromium.org 已提交
409

410 411 412 413 414
      case kNewFile: {
        uint64_t number;
        uint64_t file_size;
        if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
            GetVarint64(&input, &file_size) &&
J
jorlow@chromium.org 已提交
415 416
            GetInternalKey(&input, &f.smallest) &&
            GetInternalKey(&input, &f.largest)) {
417
          f.fd = FileDescriptor(number, 0, file_size);
J
jorlow@chromium.org 已提交
418 419
          new_files_.push_back(std::make_pair(level, f));
        } else {
420 421 422
          if (!msg) {
            msg = "new-file entry";
          }
J
jorlow@chromium.org 已提交
423 424
        }
        break;
425 426 427 428 429 430
      }
      case kNewFile2: {
        uint64_t number;
        uint64_t file_size;
        if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
            GetVarint64(&input, &file_size) &&
431 432 433
            GetInternalKey(&input, &f.smallest) &&
            GetInternalKey(&input, &f.largest) &&
            GetVarint64(&input, &f.smallest_seqno) &&
434
            GetVarint64(&input, &f.largest_seqno)) {
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
          f.fd = FileDescriptor(number, 0, file_size);
          new_files_.push_back(std::make_pair(level, f));
        } else {
          if (!msg) {
            msg = "new-file2 entry";
          }
        }
        break;
      }

      case kNewFile3: {
        uint64_t number;
        uint32_t path_id;
        uint64_t file_size;
        if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
            GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) &&
            GetInternalKey(&input, &f.smallest) &&
            GetInternalKey(&input, &f.largest) &&
            GetVarint64(&input, &f.smallest_seqno) &&
            GetVarint64(&input, &f.largest_seqno)) {
          f.fd = FileDescriptor(number, path_id, file_size);
456 457 458
          new_files_.push_back(std::make_pair(level, f));
        } else {
          if (!msg) {
459
            msg = "new-file3 entry";
460 461 462
          }
        }
        break;
463
      }
464

465 466 467 468 469
      case kNewFile4: {
        msg = DecodeNewFile4From(&input);
        break;
      }

470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
      case kColumnFamily:
        if (!GetVarint32(&input, &column_family_)) {
          if (!msg) {
            msg = "set column family id";
          }
        }
        break;

      case kColumnFamilyAdd:
        if (GetLengthPrefixedSlice(&input, &str)) {
          is_column_family_add_ = true;
          column_family_name_ = str.ToString();
        } else {
          if (!msg) {
            msg = "column family add";
          }
        }
        break;

      case kColumnFamilyDrop:
        is_column_family_drop_ = true;
        break;

J
jorlow@chromium.org 已提交
493 494 495 496 497 498
      default:
        msg = "unknown tag";
        break;
    }
  }

A
Abhishek Kona 已提交
499
  if (msg == nullptr && !input.empty()) {
J
jorlow@chromium.org 已提交
500 501 502 503
    msg = "invalid tag";
  }

  Status result;
A
Abhishek Kona 已提交
504
  if (msg != nullptr) {
J
jorlow@chromium.org 已提交
505 506 507 508 509
    result = Status::Corruption("VersionEdit", msg);
  }
  return result;
}

510
std::string VersionEdit::DebugString(bool hex_key) const {
J
jorlow@chromium.org 已提交
511 512 513 514 515 516 517 518 519 520
  std::string r;
  r.append("VersionEdit {");
  if (has_comparator_) {
    r.append("\n  Comparator: ");
    r.append(comparator_);
  }
  if (has_log_number_) {
    r.append("\n  LogNumber: ");
    AppendNumberTo(&r, log_number_);
  }
521 522 523 524
  if (has_prev_log_number_) {
    r.append("\n  PrevLogNumber: ");
    AppendNumberTo(&r, prev_log_number_);
  }
J
jorlow@chromium.org 已提交
525
  if (has_next_file_number_) {
526
    r.append("\n  NextFileNumber: ");
J
jorlow@chromium.org 已提交
527 528 529 530 531 532 533 534 535 536 537 538 539 540
    AppendNumberTo(&r, next_file_number_);
  }
  if (has_last_sequence_) {
    r.append("\n  LastSeq: ");
    AppendNumberTo(&r, last_sequence_);
  }
  for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
       iter != deleted_files_.end();
       ++iter) {
    r.append("\n  DeleteFile: ");
    AppendNumberTo(&r, iter->first);
    r.append(" ");
    AppendNumberTo(&r, iter->second);
  }
D
dgrogan@chromium.org 已提交
541
  for (size_t i = 0; i < new_files_.size(); i++) {
J
jorlow@chromium.org 已提交
542 543 544 545
    const FileMetaData& f = new_files_[i].second;
    r.append("\n  AddFile: ");
    AppendNumberTo(&r, new_files_[i].first);
    r.append(" ");
546
    AppendNumberTo(&r, f.fd.GetNumber());
J
jorlow@chromium.org 已提交
547
    r.append(" ");
548
    AppendNumberTo(&r, f.fd.GetFileSize());
G
Gabor Cselle 已提交
549
    r.append(" ");
550
    r.append(f.smallest.DebugString(hex_key));
G
Gabor Cselle 已提交
551
    r.append(" .. ");
552
    r.append(f.largest.DebugString(hex_key));
J
jorlow@chromium.org 已提交
553
  }
554 555 556 557 558 559 560 561 562
  r.append("\n  ColumnFamily: ");
  AppendNumberTo(&r, column_family_);
  if (is_column_family_add_) {
    r.append("\n  ColumnFamilyAdd: ");
    r.append(column_family_name_);
  }
  if (is_column_family_drop_) {
    r.append("\n  ColumnFamilyDrop");
  }
563 564
  if (has_max_column_family_) {
    r.append("\n  MaxColumnFamily: ");
565
    AppendNumberTo(&r, max_column_family_);
566
  }
567 568 569 570
  if (has_deleted_log_number_) {
    r.append("\n  DeletedLogNumber: ");
    AppendNumberTo(&r, deleted_log_number_);
  }
J
jorlow@chromium.org 已提交
571 572 573 574
  r.append("\n}\n");
  return r;
}

575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
  JSONWriter jw;
  jw << "EditNumber" << edit_num;

  if (has_comparator_) {
    jw << "Comparator" << comparator_;
  }
  if (has_log_number_) {
    jw << "LogNumber" << log_number_;
  }
  if (has_prev_log_number_) {
    jw << "PrevLogNumber" << prev_log_number_;
  }
  if (has_next_file_number_) {
    jw << "NextFileNumber" << next_file_number_;
  }
  if (has_last_sequence_) {
    jw << "LastSeq" << last_sequence_;
  }

  if (!deleted_files_.empty()) {
    jw << "DeletedFiles";
    jw.StartArray();

    for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
         iter != deleted_files_.end();
         ++iter) {
      jw.StartArrayedObject();
      jw << "Level" << iter->first;
      jw << "FileNumber" << iter->second;
      jw.EndArrayedObject();
    }

    jw.EndArray();
  }

  if (!new_files_.empty()) {
    jw << "AddedFiles";
    jw.StartArray();

    for (size_t i = 0; i < new_files_.size(); i++) {
      jw.StartArrayedObject();
      jw << "Level" << new_files_[i].first;
      const FileMetaData& f = new_files_[i].second;
      jw << "FileNumber" << f.fd.GetNumber();
      jw << "FileSize" << f.fd.GetFileSize();
      jw << "SmallestIKey" << f.smallest.DebugString(hex_key);
      jw << "LargestIKey" << f.largest.DebugString(hex_key);
      jw.EndArrayedObject();
    }

    jw.EndArray();
  }

  jw << "ColumnFamily" << column_family_;

  if (is_column_family_add_) {
    jw << "ColumnFamilyAdd" << column_family_name_;
  }
  if (is_column_family_drop_) {
    jw << "ColumnFamilyDrop" << column_family_name_;
  }
  if (has_max_column_family_) {
    jw << "MaxColumnFamily" << max_column_family_;
  }
640 641 642
  if (has_deleted_log_number_) {
    jw << "DeletedLogNumber" << deleted_log_number_;
  }
643 644 645 646 647 648

  jw.EndObject();

  return jw.Get();
}

649
}  // namespace rocksdb