backupable_db.cc 87.2 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
I
Igor Canadi 已提交
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10 11
#ifndef ROCKSDB_LITE

12 13
#include "rocksdb/utilities/backupable_db.h"

14
#include <stdlib.h>
15

I
Igor Canadi 已提交
16
#include <algorithm>
W
Wanning Jiang 已提交
17
#include <atomic>
18
#include <cinttypes>
19
#include <functional>
W
Wanning Jiang 已提交
20 21
#include <future>
#include <limits>
I
Igor Canadi 已提交
22
#include <map>
23
#include <mutex>
I
Igor Canadi 已提交
24
#include <sstream>
I
Igor Canadi 已提交
25
#include <string>
26
#include <thread>
27
#include <unordered_map>
28
#include <unordered_set>
W
Wanning Jiang 已提交
29
#include <vector>
I
Igor Canadi 已提交
30

31
#include "env/composite_env_wrapper.h"
32 33 34 35 36 37 38
#include "file/filename.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
39
#include "table/sst_file_dumper.h"
40 41 42 43 44 45 46
#include "test_util/sync_point.h"
#include "util/channel.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/string_util.h"
#include "utilities/checkpoint/checkpoint_impl.h"

47
namespace ROCKSDB_NAMESPACE {
I
Igor Canadi 已提交
48

49
namespace {
50 51
using ShareFilesNaming = BackupableDBOptions::ShareFilesNaming;

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) {
  std::string checksum_str;
  Slice(checksum_hex).DecodeHex(&checksum_str);
  return EndianSwapValue(DecodeFixed32(checksum_str.c_str()));
}
inline std::string ChecksumStrToHex(const std::string& checksum_str) {
  return Slice(checksum_str).ToString(true);
}
inline std::string ChecksumInt32ToHex(const uint32_t& checksum_value) {
  std::string checksum_str;
  PutFixed32(&checksum_str, EndianSwapValue(checksum_value));
  return ChecksumStrToHex(checksum_str);
}
}  // namespace

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
void BackupStatistics::IncrementNumberSuccessBackup() {
  number_success_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() {
  number_fail_backup++;
}

uint32_t BackupStatistics::GetNumberSuccessBackup() const {
  return number_success_backup;
}
uint32_t BackupStatistics::GetNumberFailBackup() const {
  return number_fail_backup;
}

std::string BackupStatistics::ToString() const {
  char result[50];
  snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
           GetNumberSuccessBackup(), GetNumberFailBackup());
  return result;
}

I
Igor Canadi 已提交
88
void BackupableDBOptions::Dump(Logger* logger) const {
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
  ROCKS_LOG_INFO(logger, "               Options.backup_dir: %s",
                 backup_dir.c_str());
  ROCKS_LOG_INFO(logger, "               Options.backup_env: %p", backup_env);
  ROCKS_LOG_INFO(logger, "        Options.share_table_files: %d",
                 static_cast<int>(share_table_files));
  ROCKS_LOG_INFO(logger, "                 Options.info_log: %p", info_log);
  ROCKS_LOG_INFO(logger, "                     Options.sync: %d",
                 static_cast<int>(sync));
  ROCKS_LOG_INFO(logger, "         Options.destroy_old_data: %d",
                 static_cast<int>(destroy_old_data));
  ROCKS_LOG_INFO(logger, "         Options.backup_log_files: %d",
                 static_cast<int>(backup_log_files));
  ROCKS_LOG_INFO(logger, "        Options.backup_rate_limit: %" PRIu64,
                 backup_rate_limit);
  ROCKS_LOG_INFO(logger, "       Options.restore_rate_limit: %" PRIu64,
                 restore_rate_limit);
  ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
                 max_background_operations);
I
Igor Canadi 已提交
107 108
}

I
Igor Canadi 已提交
109 110
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
I
Igor Canadi 已提交
111
 public:
112
  BackupEngineImpl(const BackupableDBOptions& options, Env* db_env,
I
Igor Canadi 已提交
113
                   bool read_only = false);
114
  ~BackupEngineImpl() override;
115 116 117 118 119

  using BackupEngine::CreateNewBackupWithMetadata;
  Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
                                     const std::string& app_metadata) override;

I
Igor Sugak 已提交
120
  Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
121

I
Igor Sugak 已提交
122
  Status DeleteBackup(BackupID backup_id) override;
123

I
Igor Sugak 已提交
124
  void StopBackup() override {
I
Igor Canadi 已提交
125 126
    stop_backup_.store(true, std::memory_order_release);
  }
127

I
Igor Sugak 已提交
128 129
  Status GarbageCollect() override;

130 131
  // The returned BackupInfos are in chronological order, which means the
  // latest backup comes last.
I
Igor Sugak 已提交
132
  void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
133

I
Igor Sugak 已提交
134
  void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
135 136 137 138 139 140 141 142 143 144 145 146

  using BackupEngine::RestoreDBFromBackup;
  Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
                             const std::string& db_dir,
                             const std::string& wal_dir) override;

  using BackupEngine::RestoreDBFromLatestBackup;
  Status RestoreDBFromLatestBackup(const RestoreOptions& options,
                                   const std::string& db_dir,
                                   const std::string& wal_dir) override {
    return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir,
                               wal_dir);
I
Igor Canadi 已提交
147 148
  }

149
  Status VerifyBackup(BackupID backup_id,
150
                      bool verify_with_checksum = false) override;
151

152 153
  Status Initialize();

154 155 156 157 158 159 160
  ShareFilesNaming GetNamingNoFlags() const {
    return options_.share_files_with_checksum_naming &
           BackupableDBOptions::kMaskNoNamingFlags;
  }
  ShareFilesNaming GetNamingFlags() const {
    return options_.share_files_with_checksum_naming &
           BackupableDBOptions::kMaskNamingFlags;
161
  }
162

I
Igor Canadi 已提交
163
 private:
164
  void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
165
  Status DeleteBackupInternal(BackupID backup_id);
166

167
  // Extends the "result" map with pathname->size mappings for the contents of
168
  // "dir" in "env". Pathnames are prefixed with "dir".
169
  Status InsertPathnameToSizeBytes(
170
      const std::string& dir, Env* env,
171 172
      std::unordered_map<std::string, uint64_t>* result);

L
Lei Jin 已提交
173
  struct FileInfo {
174
    FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum,
175
             const std::string& id = "", const std::string& sid = "")
176 177 178
        : refs(0),
          filename(fname),
          size(sz),
179
          checksum_hex(checksum),
180 181
          db_id(id),
          db_session_id(sid) {}
L
Lei Jin 已提交
182

183 184 185
    FileInfo(const FileInfo&) = delete;
    FileInfo& operator=(const FileInfo&) = delete;

L
Lei Jin 已提交
186 187 188
    int refs;
    const std::string filename;
    const uint64_t size;
189
    const std::string checksum_hex;
190 191
    // DB identities
    // db_id is obtained for potential usage in the future but not used
192
    // currently
193
    const std::string db_id;
194
    // db_session_id appears in the backup SST filename if the table naming
195
    // option is kUseDbSessionId
196
    const std::string db_session_id;
L
Lei Jin 已提交
197 198
  };

I
Igor Canadi 已提交
199 200
  class BackupMeta {
   public:
201 202
    BackupMeta(
        const std::string& meta_filename, const std::string& meta_tmp_filename,
203 204
        std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
        Env* env)
205 206 207 208 209 210 211
        : timestamp_(0),
          sequence_number_(0),
          size_(0),
          meta_filename_(meta_filename),
          meta_tmp_filename_(meta_tmp_filename),
          file_infos_(file_infos),
          env_(env) {}
I
Igor Canadi 已提交
212

213 214 215
    BackupMeta(const BackupMeta&) = delete;
    BackupMeta& operator=(const BackupMeta&) = delete;

I
Igor Canadi 已提交
216 217
    ~BackupMeta() {}

218
    Status RecordTimestamp() { return env_->GetCurrentTime(&timestamp_); }
I
Igor Canadi 已提交
219 220 221 222 223 224
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
225
    uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
226 227 228 229 230 231
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
I
Igor Canadi 已提交
232

233 234 235 236 237 238
    const std::string& GetAppMetadata() const { return app_metadata_; }

    void SetAppMetadata(const std::string& app_metadata) {
      app_metadata_ = app_metadata;
    }

239
    Status AddFile(std::shared_ptr<FileInfo> file_info);
L
Lei Jin 已提交
240

241
    Status Delete(bool delete_meta = true);
I
Igor Canadi 已提交
242 243 244 245 246

    bool Empty() {
      return files_.empty();
    }

247
    std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
248 249 250
      auto it = file_infos_->find(filename);
      if (it == file_infos_->end())
        return nullptr;
251
      return it->second;
252 253
    }

254
    const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
I
Igor Canadi 已提交
255 256 257
      return files_;
    }

258 259
    // @param abs_path_to_size Pre-fetched file sizes (bytes).
    Status LoadFromFile(
260
        const std::string& backup_dir,
261
        const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
I
Igor Canadi 已提交
262 263
    Status StoreToFile(bool sync);

I
Igor Canadi 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
    std::string GetInfoString() {
      std::ostringstream ss;
      ss << "Timestamp: " << timestamp_ << std::endl;
      char human_size[16];
      AppendHumanBytes(size_, human_size, sizeof(human_size));
      ss << "Size: " << human_size << std::endl;
      ss << "Files:" << std::endl;
      for (const auto& file : files_) {
        AppendHumanBytes(file->size, human_size, sizeof(human_size));
        ss << file->filename << ", size " << human_size << ", refs "
           << file->refs << std::endl;
      }
      return ss.str();
    }

I
Igor Canadi 已提交
279 280
   private:
    int64_t timestamp_;
281 282 283
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
I
Igor Canadi 已提交
284
    uint64_t size_;
285
    std::string app_metadata_;
I
Igor Canadi 已提交
286
    std::string const meta_filename_;
287
    std::string const meta_tmp_filename_;
I
Igor Canadi 已提交
288
    // files with relative paths (without "/" prefix!!)
289 290
    std::vector<std::shared_ptr<FileInfo>> files_;
    std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
I
Igor Canadi 已提交
291
    Env* env_;
292

293 294
    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
  };  // BackupMeta
I
Igor Canadi 已提交
295 296 297 298 299 300 301 302 303

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
304
  inline std::string GetSharedDirRel() const { return "shared"; }
305 306 307
  inline std::string GetSharedChecksumDirRel() const {
    return "shared_checksum";
  }
I
Igor Canadi 已提交
308
  inline std::string GetPrivateFileRel(BackupID backup_id,
I
Igor Canadi 已提交
309 310
                                       bool tmp = false,
                                       const std::string& file = "") const {
I
Igor Canadi 已提交
311
    assert(file.size() == 0 || file[0] != '/');
312
    return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
I
Igor Canadi 已提交
313
           (tmp ? ".tmp" : "") + "/" + file;
I
Igor Canadi 已提交
314
  }
I
Igor Canadi 已提交
315 316
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
I
Igor Canadi 已提交
317
    assert(file.size() == 0 || file[0] != '/');
318
    return GetSharedDirRel() + "/" + (tmp ? "." : "") + file +
319
           (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
320
  }
321 322 323
  inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
                                                  bool tmp = false) const {
    assert(file.size() == 0 || file[0] != '/');
324 325
    return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
           (tmp ? ".tmp" : "");
326
  }
327 328 329 330 331 332 333 334 335 336 337
  inline bool UseLegacyNaming(const std::string& sid) const {
    return GetNamingNoFlags() ==
               BackupableDBOptions::kLegacyCrc32cAndFileSize ||
           sid.empty();
  }
  inline bool UseInterimNaming(const std::string& sid) const {
    // The indicator of SST file from early internal 6.12 release
    // is a '-' in the DB session id. DB session id was made more
    // concise without '-' after that.
    return (GetNamingFlags() & BackupableDBOptions::kFlagMatchInterimNaming) &&
           sid.find('-') != std::string::npos;
338
  }
339
  inline std::string GetSharedFileWithChecksum(
340 341 342
      const std::string& file, bool has_checksum,
      const std::string& checksum_hex, const uint64_t file_size,
      const std::string& db_session_id) const {
343 344
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
345 346 347 348 349 350 351 352
    if (UseLegacyNaming(db_session_id)) {
      assert(has_checksum);
      (void)has_checksum;
      file_copy.insert(file_copy.find_last_of('.'),
                       "_" + ToString(ChecksumHexToInt32(checksum_hex)) + "_" +
                           ToString(file_size));
    } else if (UseInterimNaming(db_session_id)) {
      file_copy.insert(file_copy.find_last_of('.'), "_" + db_session_id);
353
    } else {
354 355 356 357 358
      file_copy.insert(file_copy.find_last_of('.'), "_s" + db_session_id);
      if (GetNamingFlags() & BackupableDBOptions::kFlagIncludeFileSize) {
        file_copy.insert(file_copy.find_last_of('.'),
                         "_" + ToString(file_size));
      }
359
    }
360
    return file_copy;
361 362 363 364 365 366 367 368
  }
  inline std::string GetFileFromChecksumFile(const std::string& file) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    size_t first_underscore = file_copy.find_first_of('_');
    return file_copy.erase(first_underscore,
                           file_copy.find_last_of('.') - first_underscore);
  }
I
Igor Canadi 已提交
369 370 371
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
372 373
  inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
    return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
374
           ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
375 376
  }

377 378 379 380 381 382
  // If size_limit == 0, there is no size limit, copy everything.
  //
  // Exactly one of src and contents must be non-empty.
  //
  // @param src If non-empty, the file is copied from this pathname.
  // @param contents If non-empty, the file will be created with these contents.
383 384 385 386 387 388 389 390 391
  Status CopyOrCreateFile(const std::string& src, const std::string& dst,
                          const std::string& contents, Env* src_env,
                          Env* dst_env, const EnvOptions& src_env_options,
                          bool sync, RateLimiter* rate_limiter,
                          uint64_t* size = nullptr,
                          std::string* checksum_hex = nullptr,
                          uint64_t size_limit = 0,
                          std::function<void()> progress_callback = []() {});

392 393 394 395
  Status ReadFileAndComputeChecksum(const std::string& src, Env* src_env,
                                    const EnvOptions& src_env_options,
                                    uint64_t size_limit,
                                    std::string* checksum_hex);
L
Lei Jin 已提交
396

397
  // Obtain db_id and db_session_id from the table properties of file_path
Z
Zitan Chen 已提交
398 399 400
  Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
                             const std::string& file_path, std::string* db_id,
                             std::string* db_session_id);
401

402
  struct CopyOrCreateResult {
403 404 405 406 407 408 409 410 411 412
    ~CopyOrCreateResult() {
      // The Status needs to be ignored here for two reasons.
      // First, if the BackupEngineImpl shuts down with jobs outstanding, then
      // it is possible that the Status in the future/promise is never read,
      // resulting in an unchecked Status. Second, if there are items in the
      // channel when the BackupEngineImpl is shutdown, these will also have
      // Status that have not been checked.  This
      // TODO: Fix those issues so that the Status
      status.PermitUncheckedError();
    }
413
    uint64_t size;
414
    std::string checksum_hex;
415 416
    std::string db_id;
    std::string db_session_id;
417 418
    Status status;
  };
419 420 421 422 423

  // Exactly one of src_path and contents must be non-empty. If src_path is
  // non-empty, the file is copied from this pathname. Otherwise, if contents is
  // non-empty, the file will be created at dst_path with these contents.
  struct CopyOrCreateWorkItem {
424 425
    std::string src_path;
    std::string dst_path;
426
    std::string contents;
427 428
    Env* src_env;
    Env* dst_env;
429
    EnvOptions src_env_options;
430
    bool sync;
431
    RateLimiter* rate_limiter;
432
    uint64_t size_limit;
433
    std::promise<CopyOrCreateResult> result;
434
    std::function<void()> progress_callback;
435 436
    bool verify_checksum_after_work;
    std::string src_checksum_func_name;
437 438 439
    std::string src_checksum_hex;
    std::string db_id;
    std::string db_session_id;
440

441
    CopyOrCreateWorkItem()
442 443 444 445 446 447 448 449
        : src_path(""),
          dst_path(""),
          contents(""),
          src_env(nullptr),
          dst_env(nullptr),
          src_env_options(),
          sync(false),
          rate_limiter(nullptr),
450 451 452
          size_limit(0),
          verify_checksum_after_work(false),
          src_checksum_func_name(kUnknownFileChecksumFuncName),
453 454 455
          src_checksum_hex(""),
          db_id(""),
          db_session_id("") {}
456

457 458
    CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
    CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
459

460 461 462
    CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
      *this = std::move(o);
    }
463

464
    CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
465 466
      src_path = std::move(o.src_path);
      dst_path = std::move(o.dst_path);
467
      contents = std::move(o.contents);
468 469
      src_env = o.src_env;
      dst_env = o.dst_env;
470
      src_env_options = std::move(o.src_env_options);
471 472 473 474
      sync = o.sync;
      rate_limiter = o.rate_limiter;
      size_limit = o.size_limit;
      result = std::move(o.result);
475
      progress_callback = std::move(o.progress_callback);
476
      verify_checksum_after_work = o.verify_checksum_after_work;
477 478 479 480
      src_checksum_func_name = std::move(o.src_checksum_func_name);
      src_checksum_hex = std::move(o.src_checksum_hex);
      db_id = std::move(o.db_id);
      db_session_id = std::move(o.db_session_id);
481 482 483
      return *this;
    }

484 485 486 487 488
    CopyOrCreateWorkItem(
        std::string _src_path, std::string _dst_path, std::string _contents,
        Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync,
        RateLimiter* _rate_limiter, uint64_t _size_limit,
        std::function<void()> _progress_callback = []() {},
489
        bool _verify_checksum_after_work = false,
490 491
        const std::string& _src_checksum_func_name =
            kUnknownFileChecksumFuncName,
492 493
        const std::string& _src_checksum_hex = "",
        const std::string& _db_id = "", const std::string& _db_session_id = "")
494 495
        : src_path(std::move(_src_path)),
          dst_path(std::move(_dst_path)),
496
          contents(std::move(_contents)),
497 498
          src_env(_src_env),
          dst_env(_dst_env),
499
          src_env_options(std::move(_src_env_options)),
500 501
          sync(_sync),
          rate_limiter(_rate_limiter),
502
          size_limit(_size_limit),
503 504 505
          progress_callback(_progress_callback),
          verify_checksum_after_work(_verify_checksum_after_work),
          src_checksum_func_name(_src_checksum_func_name),
506 507 508
          src_checksum_hex(_src_checksum_hex),
          db_id(_db_id),
          db_session_id(_db_session_id) {}
509 510
  };

511 512
  struct BackupAfterCopyOrCreateWorkItem {
    std::future<CopyOrCreateResult> result;
513 514 515 516 517 518
    bool shared;
    bool needed_to_copy;
    Env* backup_env;
    std::string dst_path_tmp;
    std::string dst_path;
    std::string dst_relative;
519 520 521 522 523 524 525
    BackupAfterCopyOrCreateWorkItem()
      : shared(false),
        needed_to_copy(false),
        backup_env(nullptr),
        dst_path_tmp(""),
        dst_path(""),
        dst_relative("") {}
526

527 528
    BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
        ROCKSDB_NOEXCEPT {
529 530 531
      *this = std::move(o);
    }

532 533
    BackupAfterCopyOrCreateWorkItem& operator=(
        BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
534 535 536 537 538 539 540 541 542 543
      result = std::move(o.result);
      shared = o.shared;
      needed_to_copy = o.needed_to_copy;
      backup_env = o.backup_env;
      dst_path_tmp = std::move(o.dst_path_tmp);
      dst_path = std::move(o.dst_path);
      dst_relative = std::move(o.dst_relative);
      return *this;
    }

544 545 546 547 548
    BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
                                    bool _shared, bool _needed_to_copy,
                                    Env* _backup_env, std::string _dst_path_tmp,
                                    std::string _dst_path,
                                    std::string _dst_relative)
549 550 551 552 553 554 555 556 557
        : result(std::move(_result)),
          shared(_shared),
          needed_to_copy(_needed_to_copy),
          backup_env(_backup_env),
          dst_path_tmp(std::move(_dst_path_tmp)),
          dst_path(std::move(_dst_path)),
          dst_relative(std::move(_dst_relative)) {}
  };

558 559
  struct RestoreAfterCopyOrCreateWorkItem {
    std::future<CopyOrCreateResult> result;
560 561
    std::string checksum_hex;
    RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {}
562
    RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
563 564
                                     const std::string& _checksum_hex)
        : result(std::move(_result)), checksum_hex(_checksum_hex) {}
565 566
    RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
        ROCKSDB_NOEXCEPT {
567 568 569
      *this = std::move(o);
    }

570 571
    RestoreAfterCopyOrCreateWorkItem& operator=(
        RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
572
      result = std::move(o.result);
573
      checksum_hex = std::move(o.checksum_hex);
574 575
      return *this;
    }
576 577
  };

578
  bool initialized_;
579
  std::mutex byte_report_mutex_;
580
  channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
D
Dmitri Smirnov 已提交
581
  std::vector<port::Thread> threads_;
582
  std::atomic<CpuPriority> threads_cpu_priority_;
583 584 585 586
  // Certain operations like PurgeOldBackups and DeleteBackup will trigger
  // automatic GarbageCollect (true) unless we've already done one in this
  // session and have not failed to delete backup files since then (false).
  bool might_need_garbage_collect_ = true;
587

588 589 590 591 592 593 594 595 596
  // Adds a file to the backup work queue to be copied or created if it doesn't
  // already exist.
  //
  // Exactly one of src_dir and contents must be non-empty.
  //
  // @param src_dir If non-empty, the file in this directory named fname will be
  //    copied.
  // @param fname Name of destination file and, in case of copy, source file.
  // @param contents If non-empty, the file will be created with these contents.
597
  Status AddBackupFileWorkItem(
598
      std::unordered_set<std::string>& live_dst_paths,
599
      std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
600
      BackupID backup_id, bool shared, const std::string& src_dir,
601
      const std::string& fname,  // starts with "/"
602 603
      const EnvOptions& src_env_options, RateLimiter* rate_limiter,
      uint64_t size_bytes, uint64_t size_limit = 0,
604
      bool shared_checksum = false,
605
      std::function<void()> progress_callback = []() {},
606 607 608
      const std::string& contents = std::string(),
      const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
      const std::string& src_checksum_str = kUnknownFileChecksum);
609

I
Igor Canadi 已提交
610 611
  // backup state data
  BackupID latest_backup_id_;
612
  BackupID latest_valid_backup_id_;
613 614 615
  std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
  std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
      corrupt_backups_;
616 617
  std::unordered_map<std::string,
                     std::shared_ptr<FileInfo>> backuped_file_infos_;
I
Igor Canadi 已提交
618
  std::atomic<bool> stop_backup_;
I
Igor Canadi 已提交
619 620 621 622 623 624

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

I
Igor Canadi 已提交
625
  // directories
626 627 628 629
  std::unique_ptr<Directory> backup_directory_;
  std::unique_ptr<Directory> shared_directory_;
  std::unique_ptr<Directory> meta_directory_;
  std::unique_ptr<Directory> private_directory_;
I
Igor Canadi 已提交
630

I
Igor Canadi 已提交
631 632
  static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
  size_t copy_file_buffer_size_;
I
Igor Canadi 已提交
633
  bool read_only_;
634
  BackupStatistics backup_statistics_;
635
  static const size_t kMaxAppMetaSize = 1024 * 1024;  // 1MB
I
Igor Canadi 已提交
636 637
};

638
Status BackupEngine::Open(const BackupableDBOptions& options, Env* env,
H
Hasnain Lakhani 已提交
639
                          BackupEngine** backup_engine_ptr) {
640
  std::unique_ptr<BackupEngineImpl> backup_engine(
641
      new BackupEngineImpl(options, env));
642 643 644 645 646 647
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
H
Hasnain Lakhani 已提交
648 649 650
  return Status::OK();
}

651 652
BackupEngineImpl::BackupEngineImpl(const BackupableDBOptions& options,
                                   Env* db_env, bool read_only)
653
    : initialized_(false),
654
      threads_cpu_priority_(),
655 656
      latest_backup_id_(0),
      latest_valid_backup_id_(0),
657
      stop_backup_(false),
I
Igor Canadi 已提交
658 659
      options_(options),
      db_env_(db_env),
I
Igor Canadi 已提交
660
      backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
I
Igor Canadi 已提交
661
      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
662 663 664 665 666 667 668 669 670 671 672 673
      read_only_(read_only) {
  if (options_.backup_rate_limiter == nullptr &&
      options_.backup_rate_limit > 0) {
    options_.backup_rate_limiter.reset(
        NewGenericRateLimiter(options_.backup_rate_limit));
  }
  if (options_.restore_rate_limiter == nullptr &&
      options_.restore_rate_limit > 0) {
    options_.restore_rate_limiter.reset(
        NewGenericRateLimiter(options_.restore_rate_limit));
  }
}
674 675

BackupEngineImpl::~BackupEngineImpl() {
676
  files_to_copy_or_create_.sendEof();
677 678 679 680
  for (auto& t : threads_) {
    t.join();
  }
  LogFlush(options_.info_log);
681 682 683
  for (const auto& it : corrupt_backups_) {
    it.second.first.PermitUncheckedError();
  }
684
}
685

686 687 688
Status BackupEngineImpl::Initialize() {
  assert(!initialized_);
  initialized_ = true;
I
Igor Canadi 已提交
689
  if (read_only_) {
690
    ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
I
Igor Canadi 已提交
691
  }
I
Igor Canadi 已提交
692 693
  options_.Dump(options_.info_log);

I
Igor Canadi 已提交
694
  if (!read_only_) {
695 696 697
    // we might need to clean up from previous crash or I/O errors
    might_need_garbage_collect_ = true;

698 699 700 701 702 703 704 705
    if (options_.max_valid_backups_to_open != port::kMaxInt32) {
      options_.max_valid_backups_to_open = port::kMaxInt32;
      ROCKS_LOG_WARN(
          options_.info_log,
          "`max_valid_backups_to_open` is not set to the default value. Ignoring "
          "its value since BackupEngine is not read-only.");
    }

706 707 708 709
    // gather the list of directories that we need to create
    std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
        directories;
    directories.emplace_back(GetAbsolutePath(), &backup_directory_);
I
Igor Canadi 已提交
710
    if (options_.share_table_files) {
711
      if (options_.share_files_with_checksum) {
712 713 714
        directories.emplace_back(
            GetAbsolutePath(GetSharedFileWithChecksumRel()),
            &shared_directory_);
715
      } else {
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
        directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
                                 &shared_directory_);
      }
    }
    directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
                             &private_directory_);
    directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
    // create all the dirs we need
    for (const auto& d : directories) {
      auto s = backup_env_->CreateDirIfMissing(d.first);
      if (s.ok()) {
        s = backup_env_->NewDirectory(d.first, d.second);
      }
      if (!s.ok()) {
        return s;
731
      }
I
Igor Canadi 已提交
732
    }
I
Igor Canadi 已提交
733
  }
I
Igor Canadi 已提交
734 735

  std::vector<std::string> backup_meta_files;
736 737
  {
    auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
738
    if (s.IsNotFound()) {
W
Wanning Jiang 已提交
739
      return Status::NotFound(GetBackupMetaDir() + " is missing");
740 741
    } else if (!s.ok()) {
      return s;
742 743
    }
  }
I
Igor Canadi 已提交
744 745
  // create backups_ structure
  for (auto& file : backup_meta_files) {
746
    ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
I
Igor Canadi 已提交
747 748
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
749
    if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
I
Igor Canadi 已提交
750 751
      if (!read_only_) {
        // invalid file name, delete that
752
        auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
753 754 755
        ROCKS_LOG_INFO(options_.info_log,
                       "Unrecognized meta file %s, deleting -- %s",
                       file.c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
756
      }
I
Igor Canadi 已提交
757 758 759
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
760 761 762
    // Insert all the (backup_id, BackupMeta) that will be loaded later
    // The loading performed later will check whether there are corrupt backups
    // and move the corrupt backups to corrupt_backups_
763
    backups_.insert(std::make_pair(
764
        backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
765 766 767
                       GetBackupMetaFile(backup_id, false /* tmp */),
                       GetBackupMetaFile(backup_id, true /* tmp */),
                       &backuped_file_infos_, backup_env_))));
I
Igor Canadi 已提交
768 769
  }

770
  latest_backup_id_ = 0;
771
  latest_valid_backup_id_ = 0;
C
clark.kang 已提交
772
  if (options_.destroy_old_data) {  // Destroy old data
I
Igor Canadi 已提交
773
    assert(!read_only_);
774 775
    ROCKS_LOG_INFO(
        options_.info_log,
I
Igor Canadi 已提交
776 777
        "Backup Engine started with destroy_old_data == true, deleting all "
        "backups");
778 779 780 781 782 783 784
    auto s = PurgeOldBackups(0);
    if (s.ok()) {
      s = GarbageCollect();
    }
    if (!s.ok()) {
      return s;
    }
785
  } else {  // Load data from storage
786 787
    // abs_path_to_size: maps absolute paths of files in backup directory to
    // their corresponding sizes
788
    std::unordered_map<std::string, uint64_t> abs_path_to_size;
789 790
    // Insert files and their sizes in backup sub-directories (shared and
    // shared_checksum) to abs_path_to_size
791 792 793
    for (const auto& rel_dir :
         {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
      const auto abs_dir = GetAbsolutePath(rel_dir);
794 795 796
      // TODO: What do do on error?
      InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size)
          .PermitUncheckedError();
797
    }
A
Andrew Kryczka 已提交
798 799
    // load the backups if any, until valid_backups_to_open of the latest
    // non-corrupted backups have been successfully opened.
800
    int valid_backups_to_open = options_.max_valid_backups_to_open;
A
Andrew Kryczka 已提交
801
    for (auto backup_iter = backups_.rbegin();
802
         backup_iter != backups_.rend();
A
Andrew Kryczka 已提交
803
         ++backup_iter) {
804 805 806 807
      assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
      if (latest_backup_id_ == 0) {
        latest_backup_id_ = backup_iter->first;
      }
808 809 810 811
      if (valid_backups_to_open == 0) {
        break;
      }

812 813
      // Insert files and their sizes in backup sub-directories
      // (private/backup_id) to abs_path_to_size
814
      Status s = InsertPathnameToSizeBytes(
A
Andrew Kryczka 已提交
815
          GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
816
          &abs_path_to_size);
817 818 819 820
      if (s.ok()) {
        s = backup_iter->second->LoadFromFile(options_.backup_dir,
                                              abs_path_to_size);
      }
821
      if (s.IsCorruption()) {
822
        ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
A
Andrew Kryczka 已提交
823 824 825 826
                       backup_iter->first, s.ToString().c_str());
        corrupt_backups_.insert(
            std::make_pair(backup_iter->first,
                           std::make_pair(s, std::move(backup_iter->second))));
827 828 829 830 831
      } else if (!s.ok()) {
        // Distinguish corruption errors from errors in the backup Env.
        // Errors in the backup Env (i.e., this code path) will cause Open() to
        // fail, whereas corruption errors would not cause Open() failures.
        return s;
I
Igor Canadi 已提交
832
      } else {
833
        ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
A
Andrew Kryczka 已提交
834 835
                       backup_iter->first,
                       backup_iter->second->GetInfoString().c_str());
836 837 838 839 840
        assert(latest_valid_backup_id_ == 0 ||
               latest_valid_backup_id_ > backup_iter->first);
        if (latest_valid_backup_id_ == 0) {
          latest_valid_backup_id_ = backup_iter->first;
        }
A
Andrew Kryczka 已提交
841
        --valid_backups_to_open;
I
Igor Canadi 已提交
842 843
      }
    }
H
Hasnain Lakhani 已提交
844

845
    for (const auto& corrupt : corrupt_backups_) {
H
Hasnain Lakhani 已提交
846
      backups_.erase(backups_.find(corrupt.first));
I
Igor Canadi 已提交
847
    }
A
Andrew Kryczka 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860
    // erase the backups before max_valid_backups_to_open
    int num_unopened_backups;
    if (options_.max_valid_backups_to_open == 0) {
      num_unopened_backups = 0;
    } else {
      num_unopened_backups =
          std::max(0, static_cast<int>(backups_.size()) -
                          options_.max_valid_backups_to_open);
    }
    for (int i = 0; i < num_unopened_backups; ++i) {
      assert(backups_.begin()->second->Empty());
      backups_.erase(backups_.begin());
    }
I
Igor Canadi 已提交
861 862
  }

863
  ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
864 865
  ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
                 latest_valid_backup_id_);
I
Igor Canadi 已提交
866

867 868
  // set up threads perform copies from files_to_copy_or_create_ in the
  // background
869 870
  threads_cpu_priority_ = CpuPriority::kNormal;
  threads_.reserve(options_.max_background_operations);
871
  for (int t = 0; t < options_.max_background_operations; t++) {
872
    threads_.emplace_back([this]() {
873 874 875 876 877
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
      pthread_setname_np(pthread_self(), "backup_engine");
#endif
#endif
878
      CpuPriority current_priority = CpuPriority::kNormal;
879 880
      CopyOrCreateWorkItem work_item;
      while (files_to_copy_or_create_.read(work_item)) {
881 882 883 884 885 886 887
        CpuPriority priority = threads_cpu_priority_;
        if (current_priority != priority) {
          TEST_SYNC_POINT_CALLBACK(
              "BackupEngineImpl::Initialize:SetCpuPriority", &priority);
          port::SetCpuPriority(0, priority);
          current_priority = priority;
        }
888 889 890
        CopyOrCreateResult result;
        result.status = CopyOrCreateFile(
            work_item.src_path, work_item.dst_path, work_item.contents,
891
            work_item.src_env, work_item.dst_env, work_item.src_env_options,
892 893 894
            work_item.sync, work_item.rate_limiter, &result.size,
            &result.checksum_hex, work_item.size_limit,
            work_item.progress_callback);
895 896
        result.db_id = work_item.db_id;
        result.db_session_id = work_item.db_session_id;
897
        if (result.status.ok() && work_item.verify_checksum_after_work) {
898 899 900 901
          // unknown checksum function name implies no db table file checksum in
          // db manifest; work_item.verify_checksum_after_work being true means
          // backup engine has calculated its crc32c checksum for the table
          // file; therefore, we are able to compare the checksums.
902 903
          if (work_item.src_checksum_func_name ==
                  kUnknownFileChecksumFuncName ||
904 905
              work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
            if (work_item.src_checksum_hex != result.checksum_hex) {
906 907
              std::string checksum_info(
                  "Expected checksum is " + work_item.src_checksum_hex +
908 909 910 911
                  " while computed checksum is " + result.checksum_hex);
              result.status =
                  Status::Corruption("Checksum mismatch after copying to " +
                                     work_item.dst_path + ": " + checksum_info);
912 913 914 915 916 917
            }
          } else {
            std::string checksum_function_info(
                "Existing checksum function is " +
                work_item.src_checksum_func_name +
                " while provided checksum function is " +
918
                kBackupFileChecksumFuncName);
919 920 921 922 923 924
            ROCKS_LOG_INFO(
                options_.info_log,
                "Unable to verify checksum after copying to %s: %s\n",
                work_item.dst_path.c_str(), checksum_function_info.c_str());
          }
        }
925 926 927 928
        work_item.result.set_value(std::move(result));
      }
    });
  }
929
  ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
I
Igor Canadi 已提交
930

931
  return Status::OK();
932
}
I
Igor Canadi 已提交
933

934
Status BackupEngineImpl::CreateNewBackupWithMetadata(
935 936
    const CreateBackupOptions& options, DB* db,
    const std::string& app_metadata) {
937
  assert(initialized_);
I
Igor Canadi 已提交
938
  assert(!read_only_);
939 940 941
  if (app_metadata.size() > kMaxAppMetaSize) {
    return Status::InvalidArgument("App metadata too large");
  }
I
Igor Canadi 已提交
942

943 944 945 946 947 948
  if (options.decrease_background_thread_cpu_priority) {
    if (options.background_thread_cpu_priority < threads_cpu_priority_) {
      threads_cpu_priority_.store(options.background_thread_cpu_priority);
    }
  }

I
Igor Canadi 已提交
949
  BackupID new_backup_id = latest_backup_id_ + 1;
W
Wanning Jiang 已提交
950

I
Igor Canadi 已提交
951
  assert(backups_.find(new_backup_id) == backups_.end());
952 953 954 955 956 957

  auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
  Status s = backup_env_->FileExists(private_dir);
  if (s.ok()) {
    // maybe last backup failed and left partial state behind, clean it up.
    // need to do this before updating backups_ such that a private dir
958 959 960 961 962
    // named after new_backup_id will be cleaned up.
    // (If an incomplete new backup is followed by an incomplete delete
    // of the latest full backup, then there could be more than one next
    // id with a private dir, the last thing to be deleted in delete
    // backup, but all will be cleaned up with a GarbageCollect.)
963 964 965 966 967 968
    s = GarbageCollect();
  } else if (s.IsNotFound()) {
    // normal case, the new backup's private dir doesn't exist yet
    s = Status::OK();
  }

969
  auto ret = backups_.insert(std::make_pair(
970
      new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
971 972 973
                         GetBackupMetaFile(new_backup_id, false /* tmp */),
                         GetBackupMetaFile(new_backup_id, true /* tmp */),
                         &backuped_file_infos_, backup_env_))));
I
Igor Canadi 已提交
974 975
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
976 977
  // TODO: What should we do on error here?
  new_backup->RecordTimestamp().PermitUncheckedError();
978
  new_backup->SetAppMetadata(app_metadata);
I
Igor Canadi 已提交
979

980
  auto start_backup = backup_env_->NowMicros();
981

982 983 984
  ROCKS_LOG_INFO(options_.info_log,
                 "Started the backup process -- creating backup %u",
                 new_backup_id);
985
  if (s.ok()) {
986
    s = backup_env_->CreateDir(private_dir);
987
  }
I
Igor Canadi 已提交
988

989 990
  RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
  if (rate_limiter) {
991
    copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
I
Igor Canadi 已提交
992 993
  }

994 995 996 997 998
  // A set into which we will insert the dst_paths that are calculated for live
  // files and live WAL files.
  // This is used to check whether a live files shares a dst_path with another
  // live file.
  std::unordered_set<std::string> live_dst_paths;
999

1000 1001
  std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
  // Add a CopyOrCreateWorkItem to the channel for each live file
1002
  Status disabled = db->DisableFileDeletions();
1003 1004 1005
  if (s.ok()) {
    CheckpointImpl checkpoint(db);
    uint64_t sequence_number = 0;
1006
    DBOptions db_options = db->GetDBOptions();
1007 1008 1009 1010 1011 1012 1013 1014 1015
    FileChecksumGenFactory* db_checksum_factory =
        db_options.file_checksum_gen_factory.get();
    const std::string kFileChecksumGenFactoryName =
        "FileChecksumGenCrc32cFactory";
    bool compare_checksum =
        db_checksum_factory != nullptr &&
                db_checksum_factory->Name() == kFileChecksumGenFactoryName
            ? true
            : false;
1016
    EnvOptions src_raw_env_options(db_options);
1017
    s = checkpoint.CreateCustomCheckpoint(
1018
        db_options,
1019
        [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
1020 1021 1022 1023 1024 1025
            FileType) {
          // custom checkpoint will switch to calling copy_file_cb after it sees
          // NotSupported returned from link_file_cb.
          return Status::NotSupported();
        } /* link_file_cb */,
        [&](const std::string& src_dirname, const std::string& fname,
1026 1027 1028
            uint64_t size_limit_bytes, FileType type,
            const std::string& checksum_func_name,
            const std::string& checksum_val) {
1029
          if (type == kWalFile && !options_.backup_log_files) {
1030 1031 1032 1033 1034 1035 1036 1037
            return Status::OK();
          }
          Log(options_.info_log, "add file for backup %s", fname.c_str());
          uint64_t size_bytes = 0;
          Status st;
          if (type == kTableFile) {
            st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
          }
1038 1039
          EnvOptions src_env_options;
          switch (type) {
1040
            case kWalFile:
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
              src_env_options =
                  db_env_->OptimizeForLogRead(src_raw_env_options);
              break;
            case kTableFile:
              src_env_options = db_env_->OptimizeForCompactionTableRead(
                  src_raw_env_options, ImmutableDBOptions(db_options));
              break;
            case kDescriptorFile:
              src_env_options =
                  db_env_->OptimizeForManifestRead(src_raw_env_options);
              break;
            default:
              // Other backed up files (like options file) are not read by live
              // DB, so don't need to worry about avoiding mixing buffered and
              // direct I/O. Just use plain defaults.
              src_env_options = src_raw_env_options;
              break;
          }
1059 1060 1061 1062
          if (st.ok()) {
            st = AddBackupFileWorkItem(
                live_dst_paths, backup_items_to_finish, new_backup_id,
                options_.share_table_files && type == kTableFile, src_dirname,
1063 1064
                fname, src_env_options, rate_limiter, size_bytes,
                size_limit_bytes,
1065
                options_.share_files_with_checksum && type == kTableFile,
1066 1067
                options.progress_callback, "" /* contents */,
                checksum_func_name, checksum_val);
1068 1069 1070 1071 1072 1073 1074
          }
          return st;
        } /* copy_file_cb */,
        [&](const std::string& fname, const std::string& contents, FileType) {
          Log(options_.info_log, "add file for backup %s", fname.c_str());
          return AddBackupFileWorkItem(
              live_dst_paths, backup_items_to_finish, new_backup_id,
1075 1076 1077
              false /* shared */, "" /* src_dir */, fname,
              EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
              0 /* size_limit */, false /* shared_checksum */,
1078
              options.progress_callback, contents);
1079
        } /* create_file_cb */,
1080
        &sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
1081
        compare_checksum);
1082 1083
    if (s.ok()) {
      new_backup->SetSequenceNumber(sequence_number);
1084 1085
    }
  }
1086
  ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
1087 1088 1089 1090 1091 1092
  Status item_status;
  for (auto& item : backup_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    if (item_status.ok() && item.shared && item.needed_to_copy) {
1093 1094
      item_status =
          item.backup_env->RenameFile(item.dst_path_tmp, item.dst_path);
1095 1096
    }
    if (item_status.ok()) {
1097
      item_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
1098
          item.dst_relative, result.size, result.checksum_hex, result.db_id,
1099
          result.db_session_id));
1100
    }
1101
    if (!item_status.ok()) {
1102
      s = item_status;
I
Igor Canadi 已提交
1103 1104 1105 1106
    }
  }

  // we copied all the files, enable file deletions
1107 1108 1109
  if (disabled.ok()) {  // If we successfully disabled file deletions
    db->EnableFileDeletions(false).PermitUncheckedError();
  }
1110 1111
  auto backup_time = backup_env_->NowMicros() - start_backup;

I
Igor Canadi 已提交
1112 1113
  if (s.ok()) {
    // persist the backup metadata on the disk
1114
    s = new_backup->StoreToFile(options_.sync);
I
Igor Canadi 已提交
1115
  }
I
Igor Canadi 已提交
1116
  if (s.ok() && options_.sync) {
1117
    std::unique_ptr<Directory> backup_private_directory;
I
Igor Canadi 已提交
1118 1119 1120 1121
    backup_env_->NewDirectory(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
        &backup_private_directory);
    if (backup_private_directory != nullptr) {
1122
      s = backup_private_directory->Fsync();
I
Igor Canadi 已提交
1123
    }
1124 1125
    if (s.ok() && private_directory_ != nullptr) {
      s = private_directory_->Fsync();
I
Igor Canadi 已提交
1126
    }
1127 1128
    if (s.ok() && meta_directory_ != nullptr) {
      s = meta_directory_->Fsync();
I
Igor Canadi 已提交
1129
    }
1130 1131
    if (s.ok() && shared_directory_ != nullptr) {
      s = shared_directory_->Fsync();
I
Igor Canadi 已提交
1132
    }
1133 1134
    if (s.ok() && backup_directory_ != nullptr) {
      s = backup_directory_->Fsync();
I
Igor Canadi 已提交
1135 1136 1137
    }
  }

1138 1139 1140
  if (s.ok()) {
    backup_statistics_.IncrementNumberSuccessBackup();
  }
I
Igor Canadi 已提交
1141
  if (!s.ok()) {
1142
    backup_statistics_.IncrementNumberFailBackup();
I
Igor Canadi 已提交
1143
    // clean all the files we might have created
1144 1145 1146 1147
    ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
                   s.ToString().c_str());
    ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
                   backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
1148
    // delete files that we might have already written
1149
    might_need_garbage_collect_ = true;
1150
    DeleteBackup(new_backup_id).PermitUncheckedError();
I
Igor Canadi 已提交
1151 1152 1153 1154 1155 1156
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
1157
  latest_valid_backup_id_ = new_backup_id;
1158
  ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
1159 1160

  // backup_speed is in byte/second
1161
  double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
1162 1163
  ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
                 new_backup->GetNumberFiles());
I
Igor Canadi 已提交
1164 1165
  char human_size[16];
  AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
1166 1167 1168 1169 1170 1171
  ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
  ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
                 backup_time);
  ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
  ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
                 backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
1172 1173 1174
  return s;
}

I
Igor Canadi 已提交
1175
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
1176
  assert(initialized_);
I
Igor Canadi 已提交
1177
  assert(!read_only_);
1178 1179 1180 1181

  // Best effort deletion even with errors
  Status overall_status = Status::OK();

1182 1183
  ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
                 num_backups_to_keep);
H
Hasnain Lakhani 已提交
1184 1185 1186 1187 1188 1189 1190
  std::vector<BackupID> to_delete;
  auto itr = backups_.begin();
  while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
    to_delete.push_back(itr->first);
    itr++;
  }
  for (auto backup_id : to_delete) {
1191
    auto s = DeleteBackupInternal(backup_id);
1192
    if (!s.ok()) {
1193
      overall_status = s;
1194
    }
I
Igor Canadi 已提交
1195
  }
1196 1197 1198 1199 1200 1201 1202 1203 1204
  // Clean up after any incomplete backup deletion, potentially from
  // earlier session.
  if (might_need_garbage_collect_) {
    auto s = GarbageCollect();
    if (!s.ok() && overall_status.ok()) {
      overall_status = s;
    }
  }
  return overall_status;
I
Igor Canadi 已提交
1205 1206
}

I
Igor Canadi 已提交
1207
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
  auto s1 = DeleteBackupInternal(backup_id);
  auto s2 = Status::OK();

  // Clean up after any incomplete backup deletion, potentially from
  // earlier session.
  if (might_need_garbage_collect_) {
    s2 = GarbageCollect();
  }

  if (!s1.ok()) {
1218
    s2.PermitUncheckedError();  // What to do?
1219 1220 1221 1222 1223 1224 1225 1226
    return s1;
  } else {
    return s2;
  }
}

// Does not auto-GarbageCollect
Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
1227
  assert(initialized_);
I
Igor Canadi 已提交
1228
  assert(!read_only_);
1229

1230
  ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
I
Igor Canadi 已提交
1231
  auto backup = backups_.find(backup_id);
H
Hasnain Lakhani 已提交
1232
  if (backup != backups_.end()) {
1233 1234 1235 1236
    auto s = backup->second->Delete();
    if (!s.ok()) {
      return s;
    }
H
Hasnain Lakhani 已提交
1237 1238 1239 1240 1241 1242
    backups_.erase(backup);
  } else {
    auto corrupt = corrupt_backups_.find(backup_id);
    if (corrupt == corrupt_backups_.end()) {
      return Status::NotFound("Backup not found");
    }
1243 1244 1245 1246
    auto s = corrupt->second.second->Delete();
    if (!s.ok()) {
      return s;
    }
1247
    corrupt->second.first.PermitUncheckedError();
H
Hasnain Lakhani 已提交
1248 1249 1250
    corrupt_backups_.erase(corrupt);
  }

1251 1252 1253
  // After removing meta file, best effort deletion even with errors.
  // (Don't delete other files if we can't delete the meta file right
  // now.)
1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
  std::vector<std::string> to_delete;
  for (auto& itr : backuped_file_infos_) {
    if (itr.second->refs == 0) {
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
      ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
                     s.ToString().c_str());
      to_delete.push_back(itr.first);
      if (!s.ok()) {
        // Trying again later might work
        might_need_garbage_collect_ = true;
1264
      }
H
Hasnain Lakhani 已提交
1265
    }
1266 1267 1268
  }
  for (auto& td : to_delete) {
    backuped_file_infos_.erase(td);
I
Igor Canadi 已提交
1269
  }
H
Hasnain Lakhani 已提交
1270 1271 1272 1273 1274

  // take care of private dirs -- GarbageCollect() will take care of them
  // if they are not empty
  std::string private_dir = GetPrivateFileRel(backup_id);
  Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
1275 1276
  ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
                 private_dir.c_str(), s.ToString().c_str());
1277 1278 1279 1280
  if (!s.ok()) {
    // Full gc or trying again later might work
    might_need_garbage_collect_ = true;
  }
I
Igor Canadi 已提交
1281 1282 1283
  return Status::OK();
}

I
Igor Canadi 已提交
1284
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1285
  assert(initialized_);
I
Igor Canadi 已提交
1286 1287
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
1288
    if (!backup.second->Empty()) {
1289 1290 1291
      backup_info->push_back(BackupInfo(
          backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
          backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
I
Igor Canadi 已提交
1292 1293 1294 1295
    }
  }
}

H
Hasnain Lakhani 已提交
1296 1297 1298
void
BackupEngineImpl::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
1299
  assert(initialized_);
H
Hasnain Lakhani 已提交
1300 1301 1302 1303 1304 1305
  corrupt_backup_ids->reserve(corrupt_backups_.size());
  for (auto& backup : corrupt_backups_) {
    corrupt_backup_ids->push_back(backup.first);
  }
}

1306 1307 1308 1309
Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
                                             BackupID backup_id,
                                             const std::string& db_dir,
                                             const std::string& wal_dir) {
1310
  assert(initialized_);
H
Hasnain Lakhani 已提交
1311 1312 1313 1314
  auto corrupt_itr = corrupt_backups_.find(backup_id);
  if (corrupt_itr != corrupt_backups_.end()) {
    return corrupt_itr->second.first;
  }
I
Igor Canadi 已提交
1315 1316 1317 1318 1319
  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  auto& backup = backup_itr->second;
1320
  if (backup->Empty()) {
I
Igor Canadi 已提交
1321 1322 1323
    return Status::NotFound("Backup not found");
  }

1324 1325
  ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
  ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1326
                 static_cast<int>(options.keep_log_files));
I
Igor Canadi 已提交
1327 1328

  // just in case. Ignore errors
1329 1330
  db_env_->CreateDirIfMissing(db_dir).PermitUncheckedError();
  db_env_->CreateDirIfMissing(wal_dir).PermitUncheckedError();
I
Igor Canadi 已提交
1331

1332
  if (options.keep_log_files) {
1333
    // delete files in db_dir, but keep all the log files
1334
    DeleteChildren(db_dir, 1 << kWalFile);
1335 1336 1337
    // move all the files from archive dir to wal_dir
    std::string archive_dir = ArchivalDirectory(wal_dir);
    std::vector<std::string> archive_files;
1338 1339
    db_env_->GetChildren(archive_dir, &archive_files)
        .PermitUncheckedError();  // ignore errors
1340 1341 1342 1343
    for (const auto& f : archive_files) {
      uint64_t number;
      FileType type;
      bool ok = ParseFileName(f, &number, &type);
1344
      if (ok && type == kWalFile) {
1345 1346 1347
        ROCKS_LOG_INFO(options_.info_log,
                       "Moving log file from archive/ to wal_dir: %s",
                       f.c_str());
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360
        Status s =
            db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
        if (!s.ok()) {
          // if we can't move log file from archive_dir to wal_dir,
          // we should fail, since it might mean data loss
          return s;
        }
      }
    }
  } else {
    DeleteChildren(wal_dir);
    DeleteChildren(ArchivalDirectory(wal_dir));
    DeleteChildren(db_dir);
1361
  }
I
Igor Canadi 已提交
1362

1363 1364
  RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
  if (rate_limiter) {
1365 1366
    copy_file_buffer_size_ =
        static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
I
Igor Canadi 已提交
1367
  }
1368
  Status s;
1369
  std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1370
  for (const auto& file_info : backup->GetFiles()) {
1371
    const std::string& file = file_info->filename;
I
Igor Canadi 已提交
1372
    std::string dst;
1373 1374 1375 1376 1377 1378 1379
    // 1. extract the filename
    size_t slash = file.find_last_of('/');
    // file will either be shared/<file>, shared_checksum/<file_crc32c_size>,
    // shared_checksum/<file_session>, shared_checksum/<file_crc32c_session>,
    // or private/<number>/<file>
    assert(slash != std::string::npos);
    dst = file.substr(slash + 1);
1380

1381 1382 1383 1384 1385
    // if the file was in shared_checksum, extract the real file name
    // in this case the file is <number>_<checksum>_<size>.<type>,
    // <number>_<session>.<type>, or <number>_<checksum>_<session>.<type>
    if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
      dst = GetFileFromChecksumFile(dst);
I
Igor Canadi 已提交
1386
    }
1387

1388 1389 1390 1391 1392 1393 1394 1395 1396
    // 2. find the filetype
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(dst, &number, &type);
    if (!ok) {
      return Status::Corruption("Backup corrupted: Fail to parse filename " +
                                dst);
    }
    // 3. Construct the final path
1397 1398
    // kWalFile lives in wal_dir and all the rest live in db_dir
    dst = ((type == kWalFile) ? wal_dir : db_dir) + "/" + dst;
I
Igor Canadi 已提交
1399

1400 1401
    ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
                   dst.c_str());
1402 1403
    CopyOrCreateWorkItem copy_or_create_work_item(
        GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1404
        EnvOptions() /* src_env_options */, false, rate_limiter,
1405
        0 /* size_limit */);
1406
    RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1407
        copy_or_create_work_item.result.get_future(), file_info->checksum_hex);
1408 1409 1410
    files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
    restore_items_to_finish.push_back(
        std::move(after_copy_or_create_work_item));
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
  }
  Status item_status;
  for (auto& item : restore_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    // Note: It is possible that both of the following bad-status cases occur
    // during copying. But, we only return one status.
    if (!item_status.ok()) {
      s = item_status;
I
Igor Canadi 已提交
1421
      break;
1422
    } else if (item.checksum_hex != result.checksum_hex) {
1423
      s = Status::Corruption("Checksum check failed");
L
Lei Jin 已提交
1424 1425
      break;
    }
I
Igor Canadi 已提交
1426 1427
  }

1428 1429
  ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
                 s.ToString().c_str());
I
Igor Canadi 已提交
1430 1431 1432
  return s;
}

1433 1434 1435
Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
                                      bool verify_with_checksum) {
  // Check if backup_id is corrupted, or valid and registered
1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
  assert(initialized_);
  auto corrupt_itr = corrupt_backups_.find(backup_id);
  if (corrupt_itr != corrupt_backups_.end()) {
    return corrupt_itr->second.first;
  }

  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound();
  }

  auto& backup = backup_itr->second;
  if (backup->Empty()) {
    return Status::NotFound();
  }

1452
  ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1453

1454
  // Find all existing backup files belong to backup_id
1455 1456 1457 1458
  std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
  for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
                              GetSharedFileWithChecksumRel()}) {
    const auto abs_dir = GetAbsolutePath(rel_dir);
1459 1460 1461
    // TODO: What to do on error?
    InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size)
        .PermitUncheckedError();
1462 1463
  }

1464
  // For all files registered in backup
1465
  for (const auto& file_info : backup->GetFiles()) {
1466
    const auto abs_path = GetAbsolutePath(file_info->filename);
1467
    // check existence of the file
1468 1469
    if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
      return Status::NotFound("File missing: " + abs_path);
1470
    }
1471
    // verify file size
1472
    if (file_info->size != curr_abs_path_to_size[abs_path]) {
1473 1474 1475 1476 1477 1478 1479 1480 1481
      std::string size_info("Expected file size is " +
                            ToString(file_info->size) +
                            " while found file size is " +
                            ToString(curr_abs_path_to_size[abs_path]));
      return Status::Corruption("File corrupted: File size mismatch for " +
                                abs_path + ": " + size_info);
    }
    if (verify_with_checksum) {
      // verify file checksum
1482
      std::string checksum_hex;
1483 1484
      ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n",
                     abs_path.c_str());
1485 1486 1487 1488 1489
      Status s = ReadFileAndComputeChecksum(abs_path, backup_env_, EnvOptions(),
                                            0 /* size_limit */, &checksum_hex);
      if (!s.ok()) {
        return s;
      } else if (file_info->checksum_hex != checksum_hex) {
1490
        std::string checksum_info(
1491 1492
            "Expected checksum is " + file_info->checksum_hex +
            " while computed checksum is " + checksum_hex);
1493
        return Status::Corruption("File corrupted: Checksum mismatch for " +
1494 1495
                                  abs_path + ": " + checksum_info);
      }
1496 1497 1498 1499 1500
    }
  }
  return Status::OK();
}

1501 1502
Status BackupEngineImpl::CopyOrCreateFile(
    const std::string& src, const std::string& dst, const std::string& contents,
1503
    Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
1504
    RateLimiter* rate_limiter, uint64_t* size, std::string* checksum_hex,
1505
    uint64_t size_limit, std::function<void()> progress_callback) {
1506
  assert(src.empty() != contents.empty());
I
Igor Canadi 已提交
1507
  Status s;
1508 1509
  std::unique_ptr<WritableFile> dst_file;
  std::unique_ptr<SequentialFile> src_file;
1510 1511
  EnvOptions dst_env_options;
  dst_env_options.use_mmap_writes = false;
1512
  // TODO:(gzh) maybe use direct reads/writes here if possible
I
Igor Canadi 已提交
1513 1514 1515
  if (size != nullptr) {
    *size = 0;
  }
1516
  uint32_t checksum_value = 0;
I
Igor Canadi 已提交
1517 1518 1519 1520 1521 1522

  // Check if size limit is set. if not, set it to very big number
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

1523
  s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
1524
  if (s.ok() && !src.empty()) {
1525
    s = src_env->NewSequentialFile(src, &src_file, src_env_options);
I
Igor Canadi 已提交
1526 1527 1528 1529 1530
  }
  if (!s.ok()) {
    return s;
  }

1531 1532
  std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
      NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
1533 1534
  std::unique_ptr<SequentialFileReader> src_reader;
  std::unique_ptr<char[]> buf;
1535
  if (!src.empty()) {
1536 1537
    src_reader.reset(new SequentialFileReader(
        NewLegacySequentialFileWrapper(src_file), src));
1538 1539
    buf.reset(new char[copy_file_buffer_size_]);
  }
I
Igor Canadi 已提交
1540

1541
  Slice data;
1542
  uint64_t processed_buffer_size = 0;
I
Igor Canadi 已提交
1543
  do {
I
Igor Canadi 已提交
1544 1545 1546
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
1547 1548 1549
    if (!src.empty()) {
      size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
                                  ? copy_file_buffer_size_
1550
                                  : static_cast<size_t>(size_limit);
1551 1552 1553 1554 1555
      s = src_reader->Read(buffer_to_read, &data, buf.get());
      processed_buffer_size += buffer_to_read;
    } else {
      data = contents;
    }
I
Igor Canadi 已提交
1556
    size_limit -= data.size();
1557 1558
    TEST_SYNC_POINT_CALLBACK(
        "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
1559 1560
        (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data
                                                                    : nullptr);
L
Lei Jin 已提交
1561 1562 1563 1564 1565

    if (!s.ok()) {
      return s;
    }

I
Igor Canadi 已提交
1566 1567 1568
    if (size != nullptr) {
      *size += data.size();
    }
1569 1570
    if (checksum_hex != nullptr) {
      checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
I
Igor Canadi 已提交
1571
    }
1572
    s = dest_writer->Append(data);
I
Igor Canadi 已提交
1573
    if (rate_limiter != nullptr) {
1574 1575
      rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
                            RateLimiter::OpType::kWrite);
I
Igor Canadi 已提交
1576
    }
1577 1578 1579 1580 1581
    if (processed_buffer_size > options_.callback_trigger_interval_size) {
      processed_buffer_size -= options_.callback_trigger_interval_size;
      std::lock_guard<std::mutex> lock(byte_report_mutex_);
      progress_callback();
    }
1582
  } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
I
Igor Canadi 已提交
1583

1584
  // Convert uint32_t checksum to hex checksum
1585 1586 1587 1588
  if (checksum_hex != nullptr) {
    checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
  }

I
Igor Canadi 已提交
1589
  if (s.ok() && sync) {
1590
    s = dest_writer->Sync(false);
I
Igor Canadi 已提交
1591
  }
W
Wanning Jiang 已提交
1592 1593 1594
  if (s.ok()) {
    s = dest_writer->Close();
  }
I
Igor Canadi 已提交
1595 1596 1597
  return s;
}

1598
// fname will always start with "/"
1599
Status BackupEngineImpl::AddBackupFileWorkItem(
1600
    std::unordered_set<std::string>& live_dst_paths,
1601
    std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1602
    BackupID backup_id, bool shared, const std::string& src_dir,
1603 1604 1605
    const std::string& fname, const EnvOptions& src_env_options,
    RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
    bool shared_checksum, std::function<void()> progress_callback,
1606 1607
    const std::string& contents, const std::string& src_checksum_func_name,
    const std::string& src_checksum_str) {
1608 1609 1610 1611
  assert(!fname.empty() && fname[0] == '/');
  assert(contents.empty() != src_dir.empty());

  std::string dst_relative = fname.substr(1);
I
Igor Canadi 已提交
1612
  std::string dst_relative_tmp;
1613
  std::string checksum_hex;
1614 1615
  std::string db_id;
  std::string db_session_id;
1616
  // whether the checksum for a table file is available
1617
  bool has_checksum = false;
1618

1619 1620 1621
  // Whenever a default checksum function name is passed in, we will compares
  // the corresponding checksum values after copying. Note that only table files
  // may have a known checksum function name passed in.
1622
  //
1623
  // If no default checksum function name is passed in and db session id is not
1624 1625
  // available, we will calculate the checksum *before* copying in two cases
  // (we always calcuate checksums when copying or creating for any file types):
1626 1627
  // a) share_files_with_checksum is true and file type is table;
  // b) share_table_files is true and the file exists already.
1628
  //
1629 1630
  // Step 0: Check if default checksum function name is passed in
  if (kDbFileChecksumFuncName == src_checksum_func_name) {
1631
    if (src_checksum_str == kUnknownFileChecksum) {
1632
      return Status::Aborted("Unknown checksum value for " + fname);
1633
    }
1634
    checksum_hex = ChecksumStrToHex(src_checksum_str);
1635 1636 1637 1638 1639
    has_checksum = true;
  }

  // Step 1: Prepare the relative path to destination
  if (shared && shared_checksum) {
1640
    if (GetNamingNoFlags() != BackupableDBOptions::kLegacyCrc32cAndFileSize) {
1641 1642 1643
      // Prepare db_session_id to add to the file name
      // Ignore the returned status
      // In the failed cases, db_id and db_session_id will be empty
Z
Zitan Chen 已提交
1644
      GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, &db_id,
1645 1646
                          &db_session_id)
          .PermitUncheckedError();
1647
    }
1648 1649 1650 1651 1652
    // Calculate checksum if checksum and db session id are not available.
    // If db session id is available, we will not calculate the checksum
    // since the session id should suffice to avoid file name collision in
    // the shared_checksum directory.
    if (!has_checksum && db_session_id.empty()) {
1653 1654
      Status s = ReadFileAndComputeChecksum(
          src_dir + fname, db_env_, src_env_options, size_limit, &checksum_hex);
1655 1656 1657 1658 1659
      if (!s.ok()) {
        return s;
      }
      has_checksum = true;
    }
1660 1661
    if (size_bytes == port::kMaxUint64) {
      return Status::NotFound("File missing: " + src_dir + fname);
1662
    }
1663
    // dst_relative depends on the following conditions:
1664
    // 1) the naming scheme is kUseDbSessionId,
1665 1666 1667 1668 1669 1670 1671 1672 1673
    // 2) db_session_id is not empty,
    // 3) checksum is available in the DB manifest.
    // If 1,2,3) are satisfied, then dst_relative will be of the form:
    // shared_checksum/<file_number>_<checksum>_<db_session_id>.sst
    // If 1,2) are satisfied, then dst_relative will be of the form:
    // shared_checksum/<file_number>_<db_session_id>.sst
    // Otherwise, dst_relative is of the form
    // shared_checksum/<file_number>_<checksum>_<size>.sst
    dst_relative = GetSharedFileWithChecksum(
1674
        dst_relative, has_checksum, checksum_hex, size_bytes, db_session_id);
1675 1676 1677
    dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
    dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
  } else if (shared) {
I
Igor Canadi 已提交
1678 1679
    dst_relative_tmp = GetSharedFileRel(dst_relative, true);
    dst_relative = GetSharedFileRel(dst_relative, false);
I
Igor Canadi 已提交
1680
  } else {
I
Igor Canadi 已提交
1681
    dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
I
Igor Canadi 已提交
1682
  }
1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696

  // We copy into `temp_dest_path` and, once finished, rename it to
  // `final_dest_path`. This allows files to atomically appear at
  // `final_dest_path`. We can copy directly to the final path when atomicity
  // is unnecessary, like for files in private backup directories.
  const std::string* copy_dest_path;
  std::string temp_dest_path;
  std::string final_dest_path = GetAbsolutePath(dst_relative);
  if (!dst_relative_tmp.empty()) {
    temp_dest_path = GetAbsolutePath(dst_relative_tmp);
    copy_dest_path = &temp_dest_path;
  } else {
    copy_dest_path = &final_dest_path;
  }
I
Igor Canadi 已提交
1697

1698
  // Step 2: Determine whether to copy or not
1699 1700
  // if it's shared, we also need to check if it exists -- if it does, no need
  // to copy it again.
I
Igor Canadi 已提交
1701
  bool need_to_copy = true;
1702
  // true if final_dest_path is the same path as another live file
1703
  const bool same_path =
1704
      live_dst_paths.find(final_dest_path) != live_dst_paths.end();
A
agiardullo 已提交
1705 1706 1707

  bool file_exists = false;
  if (shared && !same_path) {
1708 1709
    // Should be in shared directory but not a live path, check existence in
    // shared directory
1710
    Status exist = backup_env_->FileExists(final_dest_path);
A
agiardullo 已提交
1711 1712 1713 1714 1715 1716 1717 1718 1719
    if (exist.ok()) {
      file_exists = true;
    } else if (exist.IsNotFound()) {
      file_exists = false;
    } else {
      return exist;
    }
  }

1720 1721 1722
  if (!contents.empty()) {
    need_to_copy = false;
  } else if (shared && (same_path || file_exists)) {
I
Igor Canadi 已提交
1723
    need_to_copy = false;
1724 1725 1726
    auto find_result = backuped_file_infos_.find(dst_relative);
    if (find_result == backuped_file_infos_.end() && !same_path) {
      // file exists but not referenced
1727 1728
      ROCKS_LOG_INFO(
          options_.info_log,
I
Igor Canadi 已提交
1729 1730
          "%s already present, but not referenced by any backup. We will "
          "overwrite the file.",
1731
          fname.c_str());
I
Igor Canadi 已提交
1732
      need_to_copy = true;
1733 1734
      //**TODO: What to do on error?
      backup_env_->DeleteFile(final_dest_path).PermitUncheckedError();
1735
    } else {
1736
      // file exists and referenced
1737
      if (!has_checksum) {
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
        if (!same_path) {
          assert(find_result != backuped_file_infos_.end());
          // Note: to save I/O on incremental backups, we copy prior known
          // checksum of the file instead of reading entire file contents
          // to recompute it.
          checksum_hex = find_result->second->checksum_hex;
          has_checksum = true;
          // Regarding corruption detection, consider:
          // (a) the DB file is corrupt (since previous backup) and the backup
          // file is OK: we failed to detect, but the backup is safe. DB can
          // be repaired/restored once its corruption is detected.
          // (b) the backup file is corrupt (since previous backup) and the
          // db file is OK: we failed to detect, but the backup is corrupt.
          // CreateNewBackup should support fast incremental backups and
          // there's no way to support that without reading all the files.
          // We might add an option for extra checks on incremental backup,
          // but until then, use VerifyBackups to check existing backup data.
          // (c) file name collision with legitimately different content.
          // This is almost inconceivable with a well-generated DB session
          // ID, but even in that case, we double check the file sizes in
          // BackupMeta::AddFile.
        } else {
          // same_path should not happen for a standard DB, so OK to
          // read file contents to check for checksum mismatch between
          // two files from same DB getting same name.
1763 1764 1765
          Status s = ReadFileAndComputeChecksum(src_dir + fname, db_env_,
                                                src_env_options, size_limit,
                                                &checksum_hex);
1766 1767 1768
          if (!s.ok()) {
            return s;
          }
1769
        }
1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780
      }
      if (!db_session_id.empty()) {
        ROCKS_LOG_INFO(options_.info_log,
                       "%s already present, with checksum %s, size %" PRIu64
                       " and DB session identity %s",
                       fname.c_str(), checksum_hex.c_str(), size_bytes,
                       db_session_id.c_str());
      } else {
        ROCKS_LOG_INFO(options_.info_log,
                       "%s already present, with checksum %s and size %" PRIu64,
                       fname.c_str(), checksum_hex.c_str(), size_bytes);
1781
      }
1782
    }
I
Igor Canadi 已提交
1783
  }
1784
  live_dst_paths.insert(final_dest_path);
1785

1786
  // Step 3: Add work item
1787
  if (!contents.empty() || need_to_copy) {
1788
    ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1789
                   copy_dest_path->c_str());
1790
    CopyOrCreateWorkItem copy_or_create_work_item(
1791
        src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1792
        db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
1793
        size_limit, progress_callback, has_checksum, src_checksum_func_name,
1794
        checksum_hex, db_id, db_session_id);
1795 1796
    BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
        copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1797
        backup_env_, temp_dest_path, final_dest_path, dst_relative);
1798 1799
    files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
    backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1800
  } else {
1801 1802 1803
    std::promise<CopyOrCreateResult> promise_result;
    BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
        promise_result.get_future(), shared, need_to_copy, backup_env_,
1804
        temp_dest_path, final_dest_path, dst_relative);
1805 1806
    backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
    CopyOrCreateResult result;
1807
    result.status = Status::OK();
1808
    result.size = size_bytes;
1809 1810 1811
    result.checksum_hex = std::move(checksum_hex);
    result.db_id = std::move(db_id);
    result.db_session_id = std::move(db_session_id);
1812
    promise_result.set_value(std::move(result));
L
Lei Jin 已提交
1813
  }
1814
  return Status::OK();
L
Lei Jin 已提交
1815 1816
}

1817 1818 1819
Status BackupEngineImpl::ReadFileAndComputeChecksum(
    const std::string& src, Env* src_env, const EnvOptions& src_env_options,
    uint64_t size_limit, std::string* checksum_hex) {
1820
  if (checksum_hex == nullptr) {
1821
    return Status::Aborted("Checksum pointer is null");
1822 1823
  }
  uint32_t checksum_value = 0;
L
Lei Jin 已提交
1824 1825 1826 1827 1828
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  std::unique_ptr<SequentialFile> src_file;
1829
  Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
L
Lei Jin 已提交
1830 1831
  if (!s.ok()) {
    return s;
I
Igor Canadi 已提交
1832
  }
L
Lei Jin 已提交
1833

1834
  std::unique_ptr<SequentialFileReader> src_reader(
1835
      new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
L
Lei Jin 已提交
1836 1837 1838 1839 1840 1841 1842 1843
  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;

  do {
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1844
      copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1845
    s = src_reader->Read(buffer_to_read, &data, buf.get());
L
Lei Jin 已提交
1846 1847 1848 1849 1850 1851

    if (!s.ok()) {
      return s;
    }

    size_limit -= data.size();
1852
    checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
L
Lei Jin 已提交
1853 1854
  } while (data.size() > 0 && size_limit > 0);

1855 1856
  checksum_hex->assign(ChecksumInt32ToHex(checksum_value));

I
Igor Canadi 已提交
1857 1858 1859
  return s;
}

1860
Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
Z
Zitan Chen 已提交
1861
                                             const EnvOptions& src_env_options,
1862 1863 1864
                                             const std::string& file_path,
                                             std::string* db_id,
                                             std::string* db_session_id) {
1865
  assert(db_id != nullptr || db_session_id != nullptr);
Z
Zitan Chen 已提交
1866

1867 1868
  Options options;
  options.env = src_env;
Z
Zitan Chen 已提交
1869
  SstFileDumper sst_reader(options, file_path,
1870 1871 1872
                           2 * 1024 * 1024
                           /* readahead_size */,
                           false /* verify_checksum */, false /* output_hex */,
Z
Zitan Chen 已提交
1873 1874
                           false /* decode_blob_index */, src_env_options,
                           true /* silent */);
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888

  const TableProperties* table_properties = nullptr;
  std::shared_ptr<const TableProperties> tp;
  Status s = sst_reader.getStatus();

  if (s.ok()) {
    // Try to get table properties from the table reader of sst_reader
    if (!sst_reader.ReadTableProperties(&tp).ok()) {
      // Try to use table properites from the initialization of sst_reader
      table_properties = sst_reader.GetInitTableProperties();
    } else {
      table_properties = tp.get();
    }
  } else {
Z
Zitan Chen 已提交
1889 1890
    ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s",
                   file_path.c_str(), s.ToString().c_str());
1891 1892 1893 1894
    return s;
  }

  if (table_properties != nullptr) {
1895 1896 1897 1898 1899 1900
    if (db_id != nullptr) {
      db_id->assign(table_properties->db_id);
    }
    if (db_session_id != nullptr) {
      db_session_id->assign(table_properties->db_session_id);
      if (db_session_id->empty()) {
Z
Zitan Chen 已提交
1901 1902 1903
        s = Status::NotFound("DB session identity not found in " + file_path);
        ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str());
        return s;
1904
      }
1905 1906 1907
    }
    return Status::OK();
  } else {
Z
Zitan Chen 已提交
1908 1909 1910
    s = Status::Corruption("Table properties missing in " + file_path);
    ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str());
    return s;
1911 1912
  }
}
1913

1914 1915 1916
void BackupEngineImpl::DeleteChildren(const std::string& dir,
                                      uint32_t file_type_filter) {
  std::vector<std::string> children;
1917
  db_env_->GetChildren(dir, &children).PermitUncheckedError();  // ignore errors
1918 1919 1920 1921 1922 1923 1924 1925 1926

  for (const auto& f : children) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(f, &number, &type);
    if (ok && (file_type_filter & (1 << type))) {
      // don't delete this file
      continue;
    }
1927
    db_env_->DeleteFile(dir + "/" + f).PermitUncheckedError();  // ignore errors
1928 1929 1930
  }
}

1931
Status BackupEngineImpl::InsertPathnameToSizeBytes(
1932 1933
    const std::string& dir, Env* env,
    std::unordered_map<std::string, uint64_t>* result) {
1934 1935
  assert(result != nullptr);
  std::vector<Env::FileAttributes> files_attrs;
1936 1937 1938 1939 1940 1941
  Status status = env->FileExists(dir);
  if (status.ok()) {
    status = env->GetChildrenFileAttributes(dir, &files_attrs);
  } else if (status.IsNotFound()) {
    // Insert no entries can be considered success
    status = Status::OK();
1942 1943 1944 1945 1946 1947
  }
  const bool slash_needed = dir.empty() || dir.back() != '/';
  for (const auto& file_attrs : files_attrs) {
    result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
                    file_attrs.size_bytes);
  }
1948
  return status;
1949 1950
}

H
Hasnain Lakhani 已提交
1951
Status BackupEngineImpl::GarbageCollect() {
I
Igor Canadi 已提交
1952
  assert(!read_only_);
1953 1954 1955 1956 1957 1958 1959

  // We will make a best effort to remove all garbage even in the presence
  // of inconsistencies or I/O failures that inhibit finding garbage.
  Status overall_status = Status::OK();
  // If all goes well, we don't need another auto-GC this session
  might_need_garbage_collect_ = false;

1960
  ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000

  // delete obsolete shared files
  for (bool with_checksum : {false, true}) {
    std::vector<std::string> shared_children;
    {
      std::string shared_path;
      if (with_checksum) {
        shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
      } else {
        shared_path = GetAbsolutePath(GetSharedFileRel());
      }
      auto s = backup_env_->FileExists(shared_path);
      if (s.ok()) {
        s = backup_env_->GetChildren(shared_path, &shared_children);
      } else if (s.IsNotFound()) {
        s = Status::OK();
      }
      if (!s.ok()) {
        overall_status = s;
        // Trying again later might work
        might_need_garbage_collect_ = true;
      }
    }
    for (auto& child : shared_children) {
      std::string rel_fname;
      if (with_checksum) {
        rel_fname = GetSharedFileWithChecksumRel(child);
      } else {
        rel_fname = GetSharedFileRel(child);
      }
      auto child_itr = backuped_file_infos_.find(rel_fname);
      // if it's not refcounted, delete it
      if (child_itr == backuped_file_infos_.end() ||
          child_itr->second->refs == 0) {
        // this might be a directory, but DeleteFile will just fail in that
        // case, so we're good
        Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
        ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
                       rel_fname.c_str(), s.ToString().c_str());
        backuped_file_infos_.erase(rel_fname);
2001 2002 2003 2004
        if (!s.ok()) {
          // Trying again later might work
          might_need_garbage_collect_ = true;
        }
I
Igor Canadi 已提交
2005
      }
I
Igor Canadi 已提交
2006
    }
H
Hasnain Lakhani 已提交
2007
  }
I
Igor Canadi 已提交
2008

H
Hasnain Lakhani 已提交
2009 2010
  // delete obsolete private files
  std::vector<std::string> private_children;
2011 2012 2013 2014
  {
    auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
                                      &private_children);
    if (!s.ok()) {
2015 2016 2017
      overall_status = s;
      // Trying again later might work
      might_need_garbage_collect_ = true;
2018 2019
    }
  }
H
Hasnain Lakhani 已提交
2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030
  for (auto& child : private_children) {
    BackupID backup_id = 0;
    bool tmp_dir = child.find(".tmp") != std::string::npos;
    sscanf(child.c_str(), "%u", &backup_id);
    if (!tmp_dir &&  // if it's tmp_dir, delete it
        (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
      // it's either not a number or it's still alive. continue
      continue;
    }
    // here we have to delete the dir and all its children
    std::string full_private_path =
2031
        GetAbsolutePath(GetPrivateFileRel(backup_id));
H
Hasnain Lakhani 已提交
2032
    std::vector<std::string> subchildren;
2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
    if (backup_env_->GetChildren(full_private_path, &subchildren).ok()) {
      for (auto& subchild : subchildren) {
        Status s = backup_env_->DeleteFile(full_private_path + subchild);
        ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
                       (full_private_path + subchild).c_str(),
                       s.ToString().c_str());
        if (!s.ok()) {
          // Trying again later might work
          might_need_garbage_collect_ = true;
        }
2043
      }
I
Igor Canadi 已提交
2044
    }
H
Hasnain Lakhani 已提交
2045 2046
    // finally delete the private dir
    Status s = backup_env_->DeleteDir(full_private_path);
2047 2048
    ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
                   full_private_path.c_str(), s.ToString().c_str());
2049 2050 2051 2052
    if (!s.ok()) {
      // Trying again later might work
      might_need_garbage_collect_ = true;
    }
I
Igor Canadi 已提交
2053
  }
H
Hasnain Lakhani 已提交
2054

2055 2056
  assert(overall_status.ok() || might_need_garbage_collect_);
  return overall_status;
I
Igor Canadi 已提交
2057 2058 2059 2060
}

// ------- BackupMeta class --------

2061 2062 2063
Status BackupEngineImpl::BackupMeta::AddFile(
    std::shared_ptr<FileInfo> file_info) {
  auto itr = file_infos_->find(file_info->filename);
L
Lei Jin 已提交
2064
  if (itr == file_infos_->end()) {
2065
    auto ret = file_infos_->insert({file_info->filename, file_info});
L
Lei Jin 已提交
2066
    if (ret.second) {
2067 2068
      itr = ret.first;
      itr->second->refs = 1;
L
Lei Jin 已提交
2069 2070 2071 2072
    } else {
      // if this happens, something is seriously wrong
      return Status::Corruption("In memory metadata insertion error");
    }
I
Igor Canadi 已提交
2073
  } else {
2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088
    // Compare sizes, because we scanned that off the filesystem on both
    // ends. This is like a check in VerifyBackup.
    if (itr->second->size != file_info->size) {
      std::string msg = "Size mismatch for existing backup file: ";
      msg.append(file_info->filename);
      msg.append(" Size in backup is " + ToString(itr->second->size) +
                 " while size in DB is " + ToString(file_info->size));
      msg.append(
          " If this DB file checks as not corrupt, try deleting old"
          " backups or backing up to a different backup directory.");
      return Status::Corruption(msg);
    }
    // Note: to save I/O, this check will pass trivially on already backed
    // up files that don't have the checksum in their name. And it should
    // never fail for files that do have checksum in their name.
2089
    if (itr->second->checksum_hex != file_info->checksum_hex) {
2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100
      // Should never reach here, but produce an appropriate corruption
      // message in case we do in a release build.
      assert(false);
      std::string msg = "Checksum mismatch for existing backup file: ";
      msg.append(file_info->filename);
      msg.append(" Expected checksum is " + itr->second->checksum_hex +
                 " while computed checksum is " + file_info->checksum_hex);
      msg.append(
          " If this DB file checks as not corrupt, try deleting old"
          " backups or backing up to a different backup directory.");
      return Status::Corruption(msg);
L
Lei Jin 已提交
2101
    }
2102
    ++itr->second->refs;  // increase refcount if already present
I
Igor Canadi 已提交
2103
  }
L
Lei Jin 已提交
2104

2105 2106 2107
  size_ += file_info->size;
  files_.push_back(itr->second);

L
Lei Jin 已提交
2108
  return Status::OK();
I
Igor Canadi 已提交
2109 2110
}

2111 2112
Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
  Status s;
L
Lei Jin 已提交
2113
  for (const auto& file : files_) {
2114
    --file->refs;  // decrease refcount
I
Igor Canadi 已提交
2115 2116 2117
  }
  files_.clear();
  // delete meta file
A
agiardullo 已提交
2118 2119 2120 2121 2122 2123 2124
  if (delete_meta) {
    s = env_->FileExists(meta_filename_);
    if (s.ok()) {
      s = env_->DeleteFile(meta_filename_);
    } else if (s.IsNotFound()) {
      s = Status::OK();  // nothing to delete
    }
I
Igor Canadi 已提交
2125
  }
I
Igor Canadi 已提交
2126
  timestamp_ = 0;
2127
  return s;
I
Igor Canadi 已提交
2128 2129
}

2130 2131
Slice kMetaDataPrefix("metadata ");

I
Igor Canadi 已提交
2132 2133
// each backup meta file is of the format:
// <timestamp>
2134
// <seq number>
2135
// <metadata(literal string)> <metadata> (optional)
I
Igor Canadi 已提交
2136
// <number of files>
2137 2138
// <file1> <crc32(literal string)> <crc32c_value>
// <file2> <crc32(literal string)> <crc32c_value>
I
Igor Canadi 已提交
2139
// ...
2140
Status BackupEngineImpl::BackupMeta::LoadFromFile(
2141
    const std::string& backup_dir,
2142
    const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
I
Igor Canadi 已提交
2143 2144
  assert(Empty());
  Status s;
2145
  std::unique_ptr<SequentialFile> backup_meta_file;
I
Igor Canadi 已提交
2146 2147 2148 2149 2150
  s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  if (!s.ok()) {
    return s;
  }

2151
  std::unique_ptr<SequentialFileReader> backup_meta_reader(
2152 2153
      new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
                               meta_filename_));
2154
  std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
2155
  Slice data;
2156
  s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
I
Igor Canadi 已提交
2157 2158

  if (!s.ok() || data.size() == max_backup_meta_file_size_) {
L
Lei Jin 已提交
2159
    return s.ok() ? Status::Corruption("File size too big") : s;
I
Igor Canadi 已提交
2160 2161 2162 2163
  }
  buf[data.size()] = 0;

  uint32_t num_files = 0;
2164 2165 2166 2167 2168
  char *next;
  timestamp_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  sequence_number_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180

  if (data.starts_with(kMetaDataPrefix)) {
    // app metadata present
    data.remove_prefix(kMetaDataPrefix.size());
    Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
    bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
    if (!decode_success) {
      return Status::Corruption(
          "Failed to decode stored hex encoded app metadata");
    }
  }

2181
  num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
2182
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
I
Igor Canadi 已提交
2183

2184
  std::vector<std::shared_ptr<FileInfo>> files;
2185

2186
  // WART: The checksums are crc32c, not original crc32
2187 2188
  Slice checksum_prefix("crc32 ");

I
Igor Canadi 已提交
2189
  for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
L
Lei Jin 已提交
2190
    auto line = GetSliceUntil(&data, '\n');
2191 2192
    // filename is relative, i.e., shared/number.sst,
    // shared_checksum/number.sst, or private/backup_id/number.sst
L
Lei Jin 已提交
2193 2194
    std::string filename = GetSliceUntil(&line, ' ').ToString();

I
Igor Canadi 已提交
2195
    uint64_t size;
2196 2197
    const std::shared_ptr<FileInfo> file_info = GetFile(filename);
    if (file_info) {
2198 2199
      size = file_info->size;
    } else {
2200 2201 2202
      std::string abs_path = backup_dir + "/" + filename;
      try {
        size = abs_path_to_size.at(abs_path);
S
SherlockNoMad 已提交
2203
      } catch (std::out_of_range&) {
2204
        return Status::Corruption("Size missing for pathname: " + abs_path);
2205
      }
I
Igor Canadi 已提交
2206
    }
L
Lei Jin 已提交
2207 2208

    if (line.empty()) {
I
Igor Canadi 已提交
2209 2210
      return Status::Corruption("File checksum is missing for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
2211 2212 2213
    }

    uint32_t checksum_value = 0;
2214 2215
    if (line.starts_with(checksum_prefix)) {
      line.remove_prefix(checksum_prefix.size());
2216
      checksum_value = static_cast<uint32_t>(strtoul(line.data(), nullptr, 10));
2217
      if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
2218 2219
        return Status::Corruption("Invalid checksum value for " + filename +
                                  " in " + meta_filename_);
L
Lei Jin 已提交
2220 2221
      }
    } else {
I
Igor Canadi 已提交
2222 2223
      return Status::Corruption("Unknown checksum type for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
2224 2225
    }

2226
    files.emplace_back(
2227
        new FileInfo(filename, size, ChecksumInt32ToHex(checksum_value)));
2228 2229
  }

I
Igor Canadi 已提交
2230 2231
  if (s.ok() && data.size() > 0) {
    // file has to be read completely. if not, we count it as corruption
I
Igor Canadi 已提交
2232 2233
    s = Status::Corruption("Tailing data in backup meta file in " +
                           meta_filename_);
I
Igor Canadi 已提交
2234 2235
  }

2236
  if (s.ok()) {
2237
    files_.reserve(files.size());
L
Lei Jin 已提交
2238 2239 2240 2241 2242
    for (const auto& file_info : files) {
      s = AddFile(file_info);
      if (!s.ok()) {
        break;
      }
2243
    }
I
Igor Canadi 已提交
2244 2245 2246 2247 2248
  }

  return s;
}

I
Igor Canadi 已提交
2249
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
I
Igor Canadi 已提交
2250
  Status s;
2251
  std::unique_ptr<WritableFile> backup_meta_file;
I
Igor Canadi 已提交
2252 2253
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
A
Aaron Gao 已提交
2254
  env_options.use_direct_writes = false;
2255
  s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
I
Igor Canadi 已提交
2256 2257 2258 2259
  if (!s.ok()) {
    return s;
  }

2260 2261 2262 2263
  std::ostringstream buf;
  buf << timestamp_ << "\n";
  buf << sequence_number_ << "\n";

2264 2265 2266
  if (!app_metadata_.empty()) {
    std::string hex_encoded_metadata =
        Slice(app_metadata_).ToString(/* hex */ true);
2267
    buf << kMetaDataPrefix.ToString() << hex_encoded_metadata << "\n";
2268
  }
2269
  buf << files_.size() << "\n";
2270

L
Lei Jin 已提交
2271
  for (const auto& file : files_) {
2272 2273
    // use crc32c for now, switch to something else if needed
    // WART: The checksums are crc32c, not original crc32
2274 2275
    buf << file->filename << " crc32 " << ChecksumHexToInt32(file->checksum_hex)
        << "\n";
I
Igor Canadi 已提交
2276 2277
  }

2278
  s = backup_meta_file->Append(Slice(buf.str()));
I
Igor Canadi 已提交
2279 2280 2281 2282 2283 2284 2285
  if (s.ok() && sync) {
    s = backup_meta_file->Sync();
  }
  if (s.ok()) {
    s = backup_meta_file->Close();
  }
  if (s.ok()) {
2286
    s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
I
Igor Canadi 已提交
2287 2288 2289 2290
  }
  return s;
}

I
Igor Canadi 已提交
2291 2292 2293
// -------- BackupEngineReadOnlyImpl ---------
class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
 public:
2294 2295
  BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env)
      : backup_engine_(new BackupEngineImpl(options, db_env, true)) {}
I
Igor Canadi 已提交
2296

2297
  ~BackupEngineReadOnlyImpl() override {}
I
Igor Canadi 已提交
2298

2299 2300
  // The returned BackupInfos are in chronological order, which means the
  // latest backup comes last.
2301
  void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
I
Igor Canadi 已提交
2302 2303 2304
    backup_engine_->GetBackupInfo(backup_info);
  }

2305
  void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
H
Hasnain Lakhani 已提交
2306 2307 2308
    backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
  }

2309 2310 2311 2312 2313 2314
  using BackupEngineReadOnly::RestoreDBFromBackup;
  Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
                             const std::string& db_dir,
                             const std::string& wal_dir) override {
    return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir,
                                               wal_dir);
I
Igor Canadi 已提交
2315 2316
  }

2317 2318 2319 2320 2321
  using BackupEngineReadOnly::RestoreDBFromLatestBackup;
  Status RestoreDBFromLatestBackup(const RestoreOptions& options,
                                   const std::string& db_dir,
                                   const std::string& wal_dir) override {
    return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir);
I
Igor Canadi 已提交
2322 2323
  }

2324
  Status VerifyBackup(BackupID backup_id,
2325
                      bool verify_with_checksum = false) override {
2326
    return backup_engine_->VerifyBackup(backup_id, verify_with_checksum);
2327 2328
  }

2329 2330
  Status Initialize() { return backup_engine_->Initialize(); }

I
Igor Canadi 已提交
2331
 private:
I
Igor Canadi 已提交
2332
  std::unique_ptr<BackupEngineImpl> backup_engine_;
I
Igor Canadi 已提交
2333 2334
};

2335
Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
I
Igor Canadi 已提交
2336 2337 2338 2339 2340
                                  BackupEngineReadOnly** backup_engine_ptr) {
  if (options.destroy_old_data) {
    return Status::InvalidArgument(
        "Can't destroy old data with ReadOnly BackupEngine");
  }
2341
  std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
2342
      new BackupEngineReadOnlyImpl(options, env));
2343 2344 2345 2346 2347 2348
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
I
Igor Canadi 已提交
2349 2350 2351
  return Status::OK();
}

2352
}  // namespace ROCKSDB_NAMESPACE
I
Igor Canadi 已提交
2353 2354

#endif  // ROCKSDB_LITE