backupable_db.cc 60.2 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10 11
#ifndef ROCKSDB_LITE

12
#include "rocksdb/utilities/backupable_db.h"
I
Igor Canadi 已提交
13
#include "db/filename.h"
14
#include "util/channel.h"
I
Igor Canadi 已提交
15
#include "util/coding.h"
L
Lei Jin 已提交
16
#include "util/crc32c.h"
17
#include "util/file_reader_writer.h"
I
Igor Canadi 已提交
18
#include "util/logging.h"
S
sdong 已提交
19
#include "util/string_util.h"
I
Igor Canadi 已提交
20 21
#include "rocksdb/transaction_log.h"

L
liuhuahang 已提交
22
#ifndef __STDC_FORMAT_MACROS
I
Igor Canadi 已提交
23
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
24
#endif
I
Igor Canadi 已提交
25 26

#include <inttypes.h>
27
#include <stdlib.h>
I
Igor Canadi 已提交
28 29 30
#include <algorithm>
#include <vector>
#include <map>
31
#include <mutex>
I
Igor Canadi 已提交
32
#include <sstream>
I
Igor Canadi 已提交
33 34
#include <string>
#include <limits>
I
Igor Canadi 已提交
35
#include <atomic>
36 37
#include <future>
#include <thread>
38
#include <unordered_map>
39
#include <unordered_set>
40 41
#include "port/port.h"

I
Igor Canadi 已提交
42 43 44

namespace rocksdb {

L
Lei Jin 已提交
45
class BackupRateLimiter {
I
Igor Canadi 已提交
46
 public:
L
Lei Jin 已提交
47 48
  BackupRateLimiter(Env* env, uint64_t max_bytes_per_second,
                   uint64_t bytes_per_check)
I
Igor Canadi 已提交
49 50 51 52 53 54
      : env_(env),
        max_bytes_per_second_(max_bytes_per_second),
        bytes_per_check_(bytes_per_check),
        micros_start_time_(env->NowMicros()),
        bytes_since_start_(0) {}

55
  // thread safe
I
Igor Canadi 已提交
56
  void ReportAndWait(uint64_t bytes_since_last_call) {
57 58
    std::unique_lock<std::mutex> lk(lock_);

I
Igor Canadi 已提交
59 60 61 62 63 64 65 66 67 68 69 70
    bytes_since_start_ += bytes_since_last_call;
    if (bytes_since_start_ < bytes_per_check_) {
      // not enough bytes to be rate-limited
      return;
    }

    uint64_t now = env_->NowMicros();
    uint64_t interval = now - micros_start_time_;
    uint64_t should_take_micros =
        (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_;

    if (should_take_micros > interval) {
71 72
      env_->SleepForMicroseconds(
          static_cast<int>(should_take_micros - interval));
I
Igor Canadi 已提交
73 74 75 76 77 78 79 80 81
      now = env_->NowMicros();
    }
    // reset interval
    micros_start_time_ = now;
    bytes_since_start_ = 0;
  }

 private:
  Env* env_;
82
  std::mutex lock_;
I
Igor Canadi 已提交
83 84 85 86 87 88 89
  uint64_t max_bytes_per_second_;
  uint64_t bytes_per_check_;
  uint64_t micros_start_time_;
  uint64_t bytes_since_start_;
  static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
void BackupStatistics::IncrementNumberSuccessBackup() {
  number_success_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() {
  number_fail_backup++;
}

uint32_t BackupStatistics::GetNumberSuccessBackup() const {
  return number_success_backup;
}
uint32_t BackupStatistics::GetNumberFailBackup() const {
  return number_fail_backup;
}

std::string BackupStatistics::ToString() const {
  char result[50];
  snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
           GetNumberSuccessBackup(), GetNumberFailBackup());
  return result;
}

I
Igor Canadi 已提交
111
void BackupableDBOptions::Dump(Logger* logger) const {
112 113 114
  Log(logger, "               Options.backup_dir: %s", backup_dir.c_str());
  Log(logger, "               Options.backup_env: %p", backup_env);
  Log(logger, "        Options.share_table_files: %d",
I
Igor Canadi 已提交
115
      static_cast<int>(share_table_files));
116 117 118
  Log(logger, "                 Options.info_log: %p", info_log);
  Log(logger, "                     Options.sync: %d", static_cast<int>(sync));
  Log(logger, "         Options.destroy_old_data: %d",
I
Igor Canadi 已提交
119
      static_cast<int>(destroy_old_data));
120
  Log(logger, "         Options.backup_log_files: %d",
121
      static_cast<int>(backup_log_files));
122 123 124 125 126
  Log(logger, "        Options.backup_rate_limit: %" PRIu64, backup_rate_limit);
  Log(logger, "       Options.restore_rate_limit: %" PRIu64,
      restore_rate_limit);
  Log(logger, "Options.max_background_operations: %d",
      max_background_operations);
I
Igor Canadi 已提交
127 128
}

I
Igor Canadi 已提交
129 130
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
I
Igor Canadi 已提交
131
 public:
I
Igor Canadi 已提交
132 133
  BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
                   bool read_only = false);
I
Igor Canadi 已提交
134
  ~BackupEngineImpl();
I
Igor Sugak 已提交
135 136 137 138
  Status CreateNewBackup(DB* db, bool flush_before_backup = false) override;
  Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
  Status DeleteBackup(BackupID backup_id) override;
  void StopBackup() override {
I
Igor Canadi 已提交
139 140
    stop_backup_.store(true, std::memory_order_release);
  }
I
Igor Sugak 已提交
141 142 143 144 145 146 147 148 149 150
  Status GarbageCollect() override;

  void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
  void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
  Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override;
  Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override {
151 152
    return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir,
                               restore_options);
I
Igor Canadi 已提交
153 154
  }

155 156
  Status Initialize();

I
Igor Canadi 已提交
157
 private:
158 159
  void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);

L
Lei Jin 已提交
160 161 162 163
  struct FileInfo {
    FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
      : refs(0), filename(fname), size(sz), checksum_value(checksum) {}

164 165 166
    FileInfo(const FileInfo&) = delete;
    FileInfo& operator=(const FileInfo&) = delete;

L
Lei Jin 已提交
167 168 169
    int refs;
    const std::string filename;
    const uint64_t size;
170
    const uint32_t checksum_value;
L
Lei Jin 已提交
171 172
  };

I
Igor Canadi 已提交
173 174 175
  class BackupMeta {
   public:
    BackupMeta(const std::string& meta_filename,
176 177
        std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
        Env* env)
I
Igor Canadi 已提交
178
      : timestamp_(0), size_(0), meta_filename_(meta_filename),
L
Lei Jin 已提交
179
        file_infos_(file_infos), env_(env) {}
I
Igor Canadi 已提交
180

181 182 183
    BackupMeta(const BackupMeta&) = delete;
    BackupMeta& operator=(const BackupMeta&) = delete;

I
Igor Canadi 已提交
184 185 186 187 188 189 190 191 192 193 194
    ~BackupMeta() {}

    void RecordTimestamp() {
      env_->GetCurrentTime(&timestamp_);
    }
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
195
    uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
196 197 198 199 200 201
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
I
Igor Canadi 已提交
202

203
    Status AddFile(std::shared_ptr<FileInfo> file_info);
L
Lei Jin 已提交
204

205
    Status Delete(bool delete_meta = true);
I
Igor Canadi 已提交
206 207 208 209 210

    bool Empty() {
      return files_.empty();
    }

211
    std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
212 213 214
      auto it = file_infos_->find(filename);
      if (it == file_infos_->end())
        return nullptr;
215
      return it->second;
216 217
    }

218
    const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
I
Igor Canadi 已提交
219 220 221 222 223 224
      return files_;
    }

    Status LoadFromFile(const std::string& backup_dir);
    Status StoreToFile(bool sync);

I
Igor Canadi 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
    std::string GetInfoString() {
      std::ostringstream ss;
      ss << "Timestamp: " << timestamp_ << std::endl;
      char human_size[16];
      AppendHumanBytes(size_, human_size, sizeof(human_size));
      ss << "Size: " << human_size << std::endl;
      ss << "Files:" << std::endl;
      for (const auto& file : files_) {
        AppendHumanBytes(file->size, human_size, sizeof(human_size));
        ss << file->filename << ", size " << human_size << ", refs "
           << file->refs << std::endl;
      }
      return ss.str();
    }

I
Igor Canadi 已提交
240 241
   private:
    int64_t timestamp_;
242 243 244
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
I
Igor Canadi 已提交
245 246 247
    uint64_t size_;
    std::string const meta_filename_;
    // files with relative paths (without "/" prefix!!)
248 249
    std::vector<std::shared_ptr<FileInfo>> files_;
    std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
I
Igor Canadi 已提交
250
    Env* env_;
251

252 253
    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
  };  // BackupMeta
I
Igor Canadi 已提交
254 255 256 257 258 259 260 261 262

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
263 264 265
  inline std::string GetSharedChecksumDirRel() const {
    return "shared_checksum";
  }
I
Igor Canadi 已提交
266
  inline std::string GetPrivateFileRel(BackupID backup_id,
I
Igor Canadi 已提交
267 268
                                       bool tmp = false,
                                       const std::string& file = "") const {
I
Igor Canadi 已提交
269
    assert(file.size() == 0 || file[0] != '/');
S
sdong 已提交
270
    return GetPrivateDirRel() + "/" + rocksdb::ToString(backup_id) +
I
Igor Canadi 已提交
271
           (tmp ? ".tmp" : "") + "/" + file;
I
Igor Canadi 已提交
272
  }
I
Igor Canadi 已提交
273 274
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
I
Igor Canadi 已提交
275
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
276
    return "shared/" + file + (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
277
  }
278 279 280 281 282 283 284 285 286 287 288
  inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
                                                  bool tmp = false) const {
    assert(file.size() == 0 || file[0] != '/');
    return GetSharedChecksumDirRel() + "/" + file + (tmp ? ".tmp" : "");
  }
  inline std::string GetSharedFileWithChecksum(const std::string& file,
                                               const uint32_t checksum_value,
                                               const uint64_t file_size) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    return file_copy.insert(file_copy.find_last_of('.'),
S
sdong 已提交
289 290
                            "_" + rocksdb::ToString(checksum_value) + "_" +
                                rocksdb::ToString(file_size));
291 292 293 294 295 296 297 298
  }
  inline std::string GetFileFromChecksumFile(const std::string& file) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    size_t first_underscore = file_copy.find_first_of('_');
    return file_copy.erase(first_underscore,
                           file_copy.find_last_of('.') - first_underscore);
  }
I
Igor Canadi 已提交
299 300 301 302 303 304 305
  inline std::string GetLatestBackupFile(bool tmp = false) const {
    return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : ""));
  }
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
  inline std::string GetBackupMetaFile(BackupID backup_id) const {
S
sdong 已提交
306
    return GetBackupMetaDir() + "/" + rocksdb::ToString(backup_id);
I
Igor Canadi 已提交
307 308 309 310 311 312 313 314 315 316
  }

  Status GetLatestBackupFileContents(uint32_t* latest_backup);
  Status PutLatestBackupFileContents(uint32_t latest_backup);
  // if size_limit == 0, there is no size limit, copy everything
  Status CopyFile(const std::string& src,
                  const std::string& dst,
                  Env* src_env,
                  Env* dst_env,
                  bool sync,
L
Lei Jin 已提交
317
                  BackupRateLimiter* rate_limiter,
I
Igor Canadi 已提交
318
                  uint64_t* size = nullptr,
L
Lei Jin 已提交
319
                  uint32_t* checksum_value = nullptr,
I
Igor Canadi 已提交
320
                  uint64_t size_limit = 0);
L
Lei Jin 已提交
321 322 323 324 325 326

  Status CalculateChecksum(const std::string& src,
                           Env* src_env,
                           uint64_t size_limit,
                           uint32_t* checksum_value);

327 328 329 330 331 332 333 334 335 336 337 338 339 340
  struct CopyResult {
    uint64_t size;
    uint32_t checksum_value;
    Status status;
  };
  struct CopyWorkItem {
    std::string src_path;
    std::string dst_path;
    Env* src_env;
    Env* dst_env;
    bool sync;
    BackupRateLimiter* rate_limiter;
    uint64_t size_limit;
    std::promise<CopyResult> result;
341

342
    CopyWorkItem() {}
343 344 345
    CopyWorkItem(const CopyWorkItem&) = delete;
    CopyWorkItem& operator=(const CopyWorkItem&) = delete;

346
    CopyWorkItem(CopyWorkItem&& o) noexcept { *this = std::move(o); }
347

348
    CopyWorkItem& operator=(CopyWorkItem&& o) noexcept {
349 350 351 352 353 354 355 356 357 358 359
      src_path = std::move(o.src_path);
      dst_path = std::move(o.dst_path);
      src_env = o.src_env;
      dst_env = o.dst_env;
      sync = o.sync;
      rate_limiter = o.rate_limiter;
      size_limit = o.size_limit;
      result = std::move(o.result);
      return *this;
    }

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    CopyWorkItem(std::string _src_path,
                 std::string _dst_path,
                 Env* _src_env,
                 Env* _dst_env,
                 bool _sync,
                 BackupRateLimiter* _rate_limiter,
                 uint64_t _size_limit)
        : src_path(std::move(_src_path)),
          dst_path(std::move(_dst_path)),
          src_env(_src_env),
          dst_env(_dst_env),
          sync(_sync),
          rate_limiter(_rate_limiter),
          size_limit(_size_limit) {}
  };

  struct BackupAfterCopyWorkItem {
    std::future<CopyResult> result;
    bool shared;
    bool needed_to_copy;
    Env* backup_env;
    std::string dst_path_tmp;
    std::string dst_path;
    std::string dst_relative;
    BackupAfterCopyWorkItem() {}
385

386
    BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) noexcept {
387 388 389
      *this = std::move(o);
    }

390
    BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) noexcept {
391 392 393 394 395 396 397 398 399 400
      result = std::move(o.result);
      shared = o.shared;
      needed_to_copy = o.needed_to_copy;
      backup_env = o.backup_env;
      dst_path_tmp = std::move(o.dst_path_tmp);
      dst_path = std::move(o.dst_path);
      dst_relative = std::move(o.dst_relative);
      return *this;
    }

S
sdong 已提交
401 402 403
    BackupAfterCopyWorkItem(std::future<CopyResult>&& _result, bool _shared,
                            bool _needed_to_copy, Env* _backup_env,
                            std::string _dst_path_tmp, std::string _dst_path,
404 405 406 407 408 409 410 411 412 413 414 415 416 417
                            std::string _dst_relative)
        : result(std::move(_result)),
          shared(_shared),
          needed_to_copy(_needed_to_copy),
          backup_env(_backup_env),
          dst_path_tmp(std::move(_dst_path_tmp)),
          dst_path(std::move(_dst_path)),
          dst_relative(std::move(_dst_relative)) {}
  };

  struct RestoreAfterCopyWorkItem {
    std::future<CopyResult> result;
    uint32_t checksum_value;
    RestoreAfterCopyWorkItem() {}
418
    RestoreAfterCopyWorkItem(std::future<CopyResult>&& _result,
419
                             uint32_t _checksum_value)
S
sdong 已提交
420
        : result(std::move(_result)), checksum_value(_checksum_value) {}
421
    RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) noexcept {
422 423 424
      *this = std::move(o);
    }

425
    RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) noexcept {
426 427 428 429
      result = std::move(o.result);
      checksum_value = o.checksum_value;
      return *this;
    }
430 431
  };

432
  bool initialized_;
433 434 435 436 437 438 439 440 441 442 443 444 445 446
  channel<CopyWorkItem> files_to_copy_;
  std::vector<std::thread> threads_;

  Status AddBackupFileWorkItem(
          std::unordered_set<std::string>& live_dst_paths,
          std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
          BackupID backup_id,
          bool shared,
          const std::string& src_dir,
          const std::string& src_fname,  // starts with "/"
          BackupRateLimiter* rate_limiter,
          uint64_t size_limit = 0,
          bool shared_checksum = false);

I
Igor Canadi 已提交
447 448
  // backup state data
  BackupID latest_backup_id_;
449 450 451 452 453
  std::map<BackupID, unique_ptr<BackupMeta>> backups_;
  std::map<BackupID,
           std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
  std::unordered_map<std::string,
                     std::shared_ptr<FileInfo>> backuped_file_infos_;
I
Igor Canadi 已提交
454
  std::atomic<bool> stop_backup_;
I
Igor Canadi 已提交
455 456 457 458 459 460

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

I
Igor Canadi 已提交
461 462 463 464 465 466
  // directories
  unique_ptr<Directory> backup_directory_;
  unique_ptr<Directory> shared_directory_;
  unique_ptr<Directory> meta_directory_;
  unique_ptr<Directory> private_directory_;

I
Igor Canadi 已提交
467 468
  static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
  size_t copy_file_buffer_size_;
I
Igor Canadi 已提交
469
  bool read_only_;
470
  BackupStatistics backup_statistics_;
I
Igor Canadi 已提交
471 472
};

473
Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
H
Hasnain Lakhani 已提交
474
                          BackupEngine** backup_engine_ptr) {
475 476 477 478 479 480 481 482
  std::unique_ptr<BackupEngineImpl> backup_engine(
      new BackupEngineImpl(env, options));
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
H
Hasnain Lakhani 已提交
483 484 485
  return Status::OK();
}

I
Igor Canadi 已提交
486
BackupEngineImpl::BackupEngineImpl(Env* db_env,
I
Igor Canadi 已提交
487 488
                                   const BackupableDBOptions& options,
                                   bool read_only)
489 490
    : initialized_(false),
      stop_backup_(false),
I
Igor Canadi 已提交
491 492
      options_(options),
      db_env_(db_env),
I
Igor Canadi 已提交
493
      backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
I
Igor Canadi 已提交
494
      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
495 496 497 498 499 500 501 502 503
      read_only_(read_only) {}

BackupEngineImpl::~BackupEngineImpl() {
  files_to_copy_.sendEof();
  for (auto& t : threads_) {
    t.join();
  }
  LogFlush(options_.info_log);
}
504

505 506 507
Status BackupEngineImpl::Initialize() {
  assert(!initialized_);
  initialized_ = true;
I
Igor Canadi 已提交
508 509 510
  if (read_only_) {
    Log(options_.info_log, "Starting read_only backup engine");
  }
I
Igor Canadi 已提交
511 512
  options_.Dump(options_.info_log);

I
Igor Canadi 已提交
513
  if (!read_only_) {
514 515 516 517
    // gather the list of directories that we need to create
    std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
        directories;
    directories.emplace_back(GetAbsolutePath(), &backup_directory_);
I
Igor Canadi 已提交
518
    if (options_.share_table_files) {
519
      if (options_.share_files_with_checksum) {
520 521 522
        directories.emplace_back(
            GetAbsolutePath(GetSharedFileWithChecksumRel()),
            &shared_directory_);
523
      } else {
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
        directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
                                 &shared_directory_);
      }
    }
    directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
                             &private_directory_);
    directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
    // create all the dirs we need
    for (const auto& d : directories) {
      auto s = backup_env_->CreateDirIfMissing(d.first);
      if (s.ok()) {
        s = backup_env_->NewDirectory(d.first, d.second);
      }
      if (!s.ok()) {
        return s;
539
      }
I
Igor Canadi 已提交
540
    }
I
Igor Canadi 已提交
541
  }
I
Igor Canadi 已提交
542 543

  std::vector<std::string> backup_meta_files;
544 545 546 547 548 549
  {
    auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
    if (!s.ok()) {
      return s;
    }
  }
I
Igor Canadi 已提交
550 551
  // create backups_ structure
  for (auto& file : backup_meta_files) {
I
Igor Canadi 已提交
552 553 554 555
    if (file == "." || file == "..") {
      continue;
    }
    Log(options_.info_log, "Detected backup %s", file.c_str());
I
Igor Canadi 已提交
556 557
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
S
sdong 已提交
558
    if (backup_id == 0 || file != rocksdb::ToString(backup_id)) {
I
Igor Canadi 已提交
559 560
      if (!read_only_) {
        // invalid file name, delete that
561 562 563
        auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
        Log(options_.info_log, "Unrecognized meta file %s, deleting -- %s",
            file.c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
564
      }
I
Igor Canadi 已提交
565 566 567
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
568 569 570 571
    backups_.insert(std::move(
        std::make_pair(backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                                      GetBackupMetaFile(backup_id),
                                      &backuped_file_infos_, backup_env_)))));
I
Igor Canadi 已提交
572 573
  }

C
clark.kang 已提交
574
  if (options_.destroy_old_data) {  // Destroy old data
I
Igor Canadi 已提交
575
    assert(!read_only_);
I
Igor Canadi 已提交
576 577 578
    Log(options_.info_log,
        "Backup Engine started with destroy_old_data == true, deleting all "
        "backups");
579 580 581 582 583 584 585
    auto s = PurgeOldBackups(0);
    if (s.ok()) {
      s = GarbageCollect();
    }
    if (!s.ok()) {
      return s;
    }
I
Igor Canadi 已提交
586 587
    // start from beginning
    latest_backup_id_ = 0;
588
  } else {  // Load data from storage
I
Igor Canadi 已提交
589 590
    // load the backups if any
    for (auto& backup : backups_) {
591
      Status s = backup.second->LoadFromFile(options_.backup_dir);
I
Igor Canadi 已提交
592
      if (!s.ok()) {
I
Igor Canadi 已提交
593 594
        Log(options_.info_log, "Backup %u corrupted -- %s", backup.first,
            s.ToString().c_str());
H
Hasnain Lakhani 已提交
595
        corrupt_backups_.insert(std::make_pair(
596
              backup.first, std::make_pair(s, std::move(backup.second))));
I
Igor Canadi 已提交
597 598 599
      } else {
        Log(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
            backup.first, backup.second->GetInfoString().c_str());
I
Igor Canadi 已提交
600 601
      }
    }
H
Hasnain Lakhani 已提交
602

603
    for (const auto& corrupt : corrupt_backups_) {
H
Hasnain Lakhani 已提交
604
      backups_.erase(backups_.find(corrupt.first));
I
Igor Canadi 已提交
605 606 607
    }

    Status s = GetLatestBackupFileContents(&latest_backup_id_);
I
Igor Canadi 已提交
608

I
Igor Canadi 已提交
609 610 611 612 613 614 615 616 617 618
    // If latest backup file is corrupted or non-existent
    // set latest backup as the biggest backup we have
    // or 0 if we have no backups
    if (!s.ok() ||
        backups_.find(latest_backup_id_) == backups_.end()) {
      auto itr = backups_.end();
      latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
    }
  }

I
Igor Canadi 已提交
619 620
  Log(options_.info_log, "Latest backup is %u", latest_backup_id_);

I
Igor Canadi 已提交
621
  // delete any backups that claim to be later than latest
H
Hasnain Lakhani 已提交
622 623 624
  std::vector<BackupID> later_ids;
  for (auto itr = backups_.lower_bound(latest_backup_id_ + 1);
       itr != backups_.end(); itr++) {
I
Igor Canadi 已提交
625 626
    Log(options_.info_log,
        "Found backup claiming to be later than latest: %" PRIu32, itr->first);
H
Hasnain Lakhani 已提交
627 628 629
    later_ids.push_back(itr->first);
  }
  for (auto id : later_ids) {
630
    Status s;
I
Igor Canadi 已提交
631
    if (!read_only_) {
632
      s = DeleteBackup(id);
I
Igor Canadi 已提交
633 634 635 636
    } else {
      auto backup = backups_.find(id);
      // We just found it couple of lines earlier!
      assert(backup != backups_.end());
637
      s = backup->second->Delete(false);
I
Igor Canadi 已提交
638 639
      backups_.erase(backup);
    }
640 641 642 643
    if (!s.ok()) {
      Log(options_.info_log, "Failed deleting backup %" PRIu32 " -- %s", id,
          s.ToString().c_str());
    }
I
Igor Canadi 已提交
644 645
  }

I
Igor Canadi 已提交
646
  if (!read_only_) {
647 648 649 650
    auto s = PutLatestBackupFileContents(latest_backup_id_);
    if (!s.ok()) {
      return s;
    }
I
Igor Canadi 已提交
651
  }
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672

  // set up threads perform copies from files_to_copy_ in the background
  for (int t = 0; t < options_.max_background_operations; t++) {
    threads_.emplace_back([&]() {
      CopyWorkItem work_item;
      while (files_to_copy_.read(work_item)) {
        CopyResult result;
        result.status = CopyFile(work_item.src_path,
                                 work_item.dst_path,
                                 work_item.src_env,
                                 work_item.dst_env,
                                 work_item.sync,
                                 work_item.rate_limiter,
                                 &result.size,
                                 &result.checksum_value,
                                 work_item.size_limit);
        work_item.result.set_value(std::move(result));
      }
    });
  }

I
Igor Canadi 已提交
673
  Log(options_.info_log, "Initialized BackupEngine");
I
Igor Canadi 已提交
674

675
  return Status::OK();
676
}
I
Igor Canadi 已提交
677

I
Igor Canadi 已提交
678
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
679
  assert(initialized_);
I
Igor Canadi 已提交
680
  assert(!read_only_);
I
Igor Canadi 已提交
681 682 683 684
  Status s;
  std::vector<std::string> live_files;
  VectorLogPtr live_wal_files;
  uint64_t manifest_file_size = 0;
685
  uint64_t sequence_number = db->GetLatestSequenceNumber();
I
Igor Canadi 已提交
686 687 688 689 690 691 692

  s = db->DisableFileDeletions();
  if (s.ok()) {
    // this will return live_files prefixed with "/"
    s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
  }
  // if we didn't flush before backup, we need to also get WAL files
693
  if (s.ok() && !flush_before_backup && options_.backup_log_files) {
I
Igor Canadi 已提交
694 695 696 697
    // returns file names prefixed with "/"
    s = db->GetSortedWalFiles(live_wal_files);
  }
  if (!s.ok()) {
698
    db->EnableFileDeletions(false);
I
Igor Canadi 已提交
699 700 701 702 703
    return s;
  }

  BackupID new_backup_id = latest_backup_id_ + 1;
  assert(backups_.find(new_backup_id) == backups_.end());
704 705 706 707
  auto ret = backups_.insert(std::move(
      std::make_pair(new_backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                                        GetBackupMetaFile(new_backup_id),
                                        &backuped_file_infos_, backup_env_)))));
I
Igor Canadi 已提交
708 709
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
710 711
  new_backup->RecordTimestamp();
  new_backup->SetSequenceNumber(sequence_number);
I
Igor Canadi 已提交
712

713 714
  auto start_backup = backup_env_-> NowMicros();

I
Igor Canadi 已提交
715 716 717
  Log(options_.info_log, "Started the backup process -- creating backup %u",
      new_backup_id);

I
Igor Canadi 已提交
718 719 720
  // create temporary private dir
  s = backup_env_->CreateDir(
      GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
I
Igor Canadi 已提交
721

L
Lei Jin 已提交
722
  unique_ptr<BackupRateLimiter> rate_limiter;
I
Igor Canadi 已提交
723 724
  if (options_.backup_rate_limit > 0) {
    copy_file_buffer_size_ = options_.backup_rate_limit / 10;
L
Lei Jin 已提交
725 726
    rate_limiter.reset(new BackupRateLimiter(db_env_,
          options_.backup_rate_limit, copy_file_buffer_size_));
I
Igor Canadi 已提交
727 728
  }

729 730 731 732 733 734 735 736 737
  // A set into which we will insert the dst_paths that are calculated for live
  // files and live WAL files.
  // This is used to check whether a live files shares a dst_path with another
  // live file.
  std::unordered_set<std::string> live_dst_paths;
  live_dst_paths.reserve(live_files.size() + live_wal_files.size());

  std::vector<BackupAfterCopyWorkItem> backup_items_to_finish;
  // Add a CopyWorkItem to the channel for each live file
I
Igor Canadi 已提交
738 739 740 741
  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(live_files[i], &number, &type);
I
Igor Canadi 已提交
742 743 744 745
    if (!ok) {
      assert(false);
      return Status::Corruption("Can't parse file name. This is very bad");
    }
I
Igor Canadi 已提交
746
    // we should only get sst, manifest and current files here
I
Igor Canadi 已提交
747 748
    assert(type == kTableFile || type == kDescriptorFile ||
           type == kCurrentFile);
I
Igor Canadi 已提交
749 750

    // rules:
751
    // * if it's kTableFile, then it's shared
I
Igor Canadi 已提交
752
    // * if it's kDescriptorFile, limit the size to manifest_file_size
753 754 755 756 757 758 759 760 761 762 763 764
    s = AddBackupFileWorkItem(
            live_dst_paths,
            backup_items_to_finish,
            new_backup_id,
            options_.share_table_files && type == kTableFile,
            db->GetName(),
            live_files[i],
            rate_limiter.get(),
            (type == kDescriptorFile) ? manifest_file_size : 0,
            options_.share_files_with_checksum && type == kTableFile);
  }
  // Add a CopyWorkItem to the channel for each WAL file
I
Igor Canadi 已提交
765 766 767 768
  for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
    if (live_wal_files[i]->Type() == kAliveLogFile) {
      // we only care about live log files
      // copy the file into backup_dir/files/<new backup>/
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
      s = AddBackupFileWorkItem(live_dst_paths,
                                backup_items_to_finish,
                                new_backup_id,
                                false, /* not shared */
                                db->GetOptions().wal_dir,
                                live_wal_files[i]->PathName(),
                                rate_limiter.get());
    }
  }

  Status item_status;
  for (auto& item : backup_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    if (item_status.ok() && item.shared && item.needed_to_copy) {
      item_status = item.backup_env->RenameFile(item.dst_path_tmp,
                                                item.dst_path);
    }
    if (item_status.ok()) {
      item_status = new_backup.get()->AddFile(
              std::make_shared<FileInfo>(item.dst_relative,
                                         result.size,
                                         result.checksum_value));
    }
    if (!item_status.ok()) {
      s = item_status;
I
Igor Canadi 已提交
796 797 798 799
    }
  }

  // we copied all the files, enable file deletions
800
  db->EnableFileDeletions(false);
I
Igor Canadi 已提交
801

I
Igor Canadi 已提交
802 803
  if (s.ok()) {
    // move tmp private backup to real backup folder
I
Igor Canadi 已提交
804 805 806 807
    Log(options_.info_log,
        "Moving tmp backup directory to the real one: %s -> %s\n",
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)).c_str(),
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)).c_str());
I
Igor Canadi 已提交
808
    s = backup_env_->RenameFile(
809
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)),  // tmp
I
Igor Canadi 已提交
810 811 812
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)));
  }

813 814
  auto backup_time = backup_env_->NowMicros() - start_backup;

I
Igor Canadi 已提交
815 816
  if (s.ok()) {
    // persist the backup metadata on the disk
817
    s = new_backup->StoreToFile(options_.sync);
I
Igor Canadi 已提交
818 819 820 821 822
  }
  if (s.ok()) {
    // install the newly created backup meta! (atomic)
    s = PutLatestBackupFileContents(new_backup_id);
  }
I
Igor Canadi 已提交
823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
  if (s.ok() && options_.sync) {
    unique_ptr<Directory> backup_private_directory;
    backup_env_->NewDirectory(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
        &backup_private_directory);
    if (backup_private_directory != nullptr) {
      backup_private_directory->Fsync();
    }
    if (private_directory_ != nullptr) {
      private_directory_->Fsync();
    }
    if (meta_directory_ != nullptr) {
      meta_directory_->Fsync();
    }
    if (shared_directory_ != nullptr) {
      shared_directory_->Fsync();
    }
    if (backup_directory_ != nullptr) {
      backup_directory_->Fsync();
    }
  }

845 846 847
  if (s.ok()) {
    backup_statistics_.IncrementNumberSuccessBackup();
  }
I
Igor Canadi 已提交
848
  if (!s.ok()) {
849
    backup_statistics_.IncrementNumberFailBackup();
I
Igor Canadi 已提交
850 851
    // clean all the files we might have created
    Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str());
852 853
    Log(options_.info_log, "Backup Statistics %s\n",
        backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
854 855 856
    // delete files that we might have already written
    DeleteBackup(new_backup_id);
    GarbageCollect();
I
Igor Canadi 已提交
857 858 859 860 861 862 863
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
  Log(options_.info_log, "Backup DONE. All is good");
864 865

  // backup_speed is in byte/second
866
  double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
867
  Log(options_.info_log, "Backup number of files: %u",
868
      new_backup->GetNumberFiles());
I
Igor Canadi 已提交
869 870 871
  char human_size[16];
  AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
  Log(options_.info_log, "Backup size: %s", human_size);
I
Igor Canadi 已提交
872
  Log(options_.info_log, "Backup time: %" PRIu64 " microseconds", backup_time);
873 874 875
  Log(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
  Log(options_.info_log, "Backup Statistics %s",
      backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
876 877 878
  return s;
}

I
Igor Canadi 已提交
879
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
880
  assert(initialized_);
I
Igor Canadi 已提交
881
  assert(!read_only_);
I
Igor Canadi 已提交
882 883
  Log(options_.info_log, "Purging old backups, keeping %u",
      num_backups_to_keep);
H
Hasnain Lakhani 已提交
884 885 886 887 888 889 890
  std::vector<BackupID> to_delete;
  auto itr = backups_.begin();
  while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
    to_delete.push_back(itr->first);
    itr++;
  }
  for (auto backup_id : to_delete) {
891 892 893 894
    auto s = DeleteBackup(backup_id);
    if (!s.ok()) {
      return s;
    }
I
Igor Canadi 已提交
895 896 897 898
  }
  return Status::OK();
}

I
Igor Canadi 已提交
899
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
900
  assert(initialized_);
I
Igor Canadi 已提交
901
  assert(!read_only_);
I
Igor Canadi 已提交
902 903
  Log(options_.info_log, "Deleting backup %u", backup_id);
  auto backup = backups_.find(backup_id);
H
Hasnain Lakhani 已提交
904
  if (backup != backups_.end()) {
905 906 907 908
    auto s = backup->second->Delete();
    if (!s.ok()) {
      return s;
    }
H
Hasnain Lakhani 已提交
909 910 911 912 913 914
    backups_.erase(backup);
  } else {
    auto corrupt = corrupt_backups_.find(backup_id);
    if (corrupt == corrupt_backups_.end()) {
      return Status::NotFound("Backup not found");
    }
915 916 917 918
    auto s = corrupt->second.second->Delete();
    if (!s.ok()) {
      return s;
    }
H
Hasnain Lakhani 已提交
919 920 921 922 923
    corrupt_backups_.erase(corrupt);
  }

  std::vector<std::string> to_delete;
  for (auto& itr : backuped_file_infos_) {
924
    if (itr.second->refs == 0) {
H
Hasnain Lakhani 已提交
925 926 927 928 929 930 931 932
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
      Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
          s.ToString().c_str());
      to_delete.push_back(itr.first);
    }
  }
  for (auto& td : to_delete) {
    backuped_file_infos_.erase(td);
I
Igor Canadi 已提交
933
  }
H
Hasnain Lakhani 已提交
934 935 936 937 938 939 940

  // take care of private dirs -- GarbageCollect() will take care of them
  // if they are not empty
  std::string private_dir = GetPrivateFileRel(backup_id);
  Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
  Log(options_.info_log, "Deleting private dir %s -- %s",
      private_dir.c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
941 942 943
  return Status::OK();
}

I
Igor Canadi 已提交
944
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
945
  assert(initialized_);
I
Igor Canadi 已提交
946 947
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
948
    if (!backup.second->Empty()) {
949
        backup_info->push_back(BackupInfo(
950 951 952
            backup.first, backup.second->GetTimestamp(),
            backup.second->GetSize(),
            backup.second->GetNumberFiles()));
I
Igor Canadi 已提交
953 954 955 956
    }
  }
}

H
Hasnain Lakhani 已提交
957 958 959
void
BackupEngineImpl::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
960
  assert(initialized_);
H
Hasnain Lakhani 已提交
961 962 963 964 965 966
  corrupt_backup_ids->reserve(corrupt_backups_.size());
  for (auto& backup : corrupt_backups_) {
    corrupt_backup_ids->push_back(backup.first);
  }
}

967 968 969
Status BackupEngineImpl::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
970
  assert(initialized_);
H
Hasnain Lakhani 已提交
971 972 973 974
  auto corrupt_itr = corrupt_backups_.find(backup_id);
  if (corrupt_itr != corrupt_backups_.end()) {
    return corrupt_itr->second.first;
  }
I
Igor Canadi 已提交
975 976 977 978 979
  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  auto& backup = backup_itr->second;
980
  if (backup->Empty()) {
I
Igor Canadi 已提交
981 982 983 984
    return Status::NotFound("Backup not found");
  }

  Log(options_.info_log, "Restoring backup id %u\n", backup_id);
985 986
  Log(options_.info_log, "keep_log_files: %d\n",
      static_cast<int>(restore_options.keep_log_files));
I
Igor Canadi 已提交
987 988 989 990 991

  // just in case. Ignore errors
  db_env_->CreateDirIfMissing(db_dir);
  db_env_->CreateDirIfMissing(wal_dir);

992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
  if (restore_options.keep_log_files) {
    // delete files in db_dir, but keep all the log files
    DeleteChildren(db_dir, 1 << kLogFile);
    // move all the files from archive dir to wal_dir
    std::string archive_dir = ArchivalDirectory(wal_dir);
    std::vector<std::string> archive_files;
    db_env_->GetChildren(archive_dir, &archive_files);  // ignore errors
    for (const auto& f : archive_files) {
      uint64_t number;
      FileType type;
      bool ok = ParseFileName(f, &number, &type);
      if (ok && type == kLogFile) {
        Log(options_.info_log, "Moving log file from archive/ to wal_dir: %s",
            f.c_str());
        Status s =
            db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
        if (!s.ok()) {
          // if we can't move log file from archive_dir to wal_dir,
          // we should fail, since it might mean data loss
          return s;
        }
      }
    }
  } else {
    DeleteChildren(wal_dir);
    DeleteChildren(ArchivalDirectory(wal_dir));
    DeleteChildren(db_dir);
1019
  }
I
Igor Canadi 已提交
1020

L
Lei Jin 已提交
1021
  unique_ptr<BackupRateLimiter> rate_limiter;
I
Igor Canadi 已提交
1022 1023
  if (options_.restore_rate_limit > 0) {
    copy_file_buffer_size_ = options_.restore_rate_limit / 10;
L
Lei Jin 已提交
1024 1025
    rate_limiter.reset(new BackupRateLimiter(db_env_,
          options_.restore_rate_limit, copy_file_buffer_size_));
I
Igor Canadi 已提交
1026
  }
I
Igor Canadi 已提交
1027
  Status s;
1028
  std::vector<RestoreAfterCopyWorkItem> restore_items_to_finish;
1029 1030
  for (const auto& file_info : backup->GetFiles()) {
    const std::string &file = file_info->filename;
I
Igor Canadi 已提交
1031 1032 1033
    std::string dst;
    // 1. extract the filename
    size_t slash = file.find_last_of('/');
1034 1035
    // file will either be shared/<file>, shared_checksum/<file_crc32_size>
    // or private/<number>/<file>
I
Igor Canadi 已提交
1036 1037 1038
    assert(slash != std::string::npos);
    dst = file.substr(slash + 1);

1039 1040 1041 1042 1043 1044
    // if the file was in shared_checksum, extract the real file name
    // in this case the file is <number>_<checksum>_<size>.<type>
    if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
      dst = GetFileFromChecksumFile(dst);
    }

I
Igor Canadi 已提交
1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057
    // 2. find the filetype
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(dst, &number, &type);
    if (!ok) {
      return Status::Corruption("Backup corrupted");
    }
    // 3. Construct the final path
    // kLogFile lives in wal_dir and all the rest live in db_dir
    dst = ((type == kLogFile) ? wal_dir : db_dir) +
      "/" + dst;

    Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
    CopyWorkItem copy_work_item(GetAbsolutePath(file),
                                dst,
                                backup_env_,
                                db_env_,
                                false,
                                rate_limiter.get(),
                                0 /* size_limit */);
    RestoreAfterCopyWorkItem after_copy_work_item(
            copy_work_item.result.get_future(),
            file_info->checksum_value);
    files_to_copy_.write(std::move(copy_work_item));
    restore_items_to_finish.push_back(std::move(after_copy_work_item));
  }
  Status item_status;
  for (auto& item : restore_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    // Note: It is possible that both of the following bad-status cases occur
    // during copying. But, we only return one status.
    if (!item_status.ok()) {
      s = item_status;
I
Igor Canadi 已提交
1080
      break;
1081
    } else if (item.checksum_value != result.checksum_value) {
L
Lei Jin 已提交
1082 1083 1084
      s = Status::Corruption("Checksum check failed");
      break;
    }
I
Igor Canadi 已提交
1085 1086 1087 1088 1089 1090 1091
  }

  Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
  return s;
}

// latest backup id is an ASCII representation of latest backup id
I
Igor Canadi 已提交
1092
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
I
Igor Canadi 已提交
1093 1094 1095 1096 1097 1098 1099 1100 1101
  Status s;
  unique_ptr<SequentialFile> file;
  s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
                                     &file,
                                     EnvOptions());
  if (!s.ok()) {
    return s;
  }

1102 1103
  char buf[11];
  Slice data;
1104 1105 1106 1107
  unique_ptr<SequentialFileReader> file_reader(
      new SequentialFileReader(std::move(file)));

  s = file_reader->Read(10, &data, buf);
I
Igor Canadi 已提交
1108 1109 1110
  if (!s.ok() || data.size() == 0) {
    return s.ok() ? Status::Corruption("Latest backup file corrupted") : s;
  }
1111
  buf[data.size()] = 0;
I
Igor Canadi 已提交
1112

1113
  *latest_backup = 0;
I
Igor Canadi 已提交
1114
  sscanf(data.data(), "%u", latest_backup);
A
agiardullo 已提交
1115 1116 1117

  s = backup_env_->FileExists(GetBackupMetaFile(*latest_backup));
  if (s.IsNotFound()) {
I
Igor Canadi 已提交
1118 1119
    s = Status::Corruption("Latest backup file corrupted");
  }
A
agiardullo 已提交
1120
  return s;
I
Igor Canadi 已提交
1121 1122 1123 1124 1125 1126
}

// this operation HAS to be atomic
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
I
Igor Canadi 已提交
1127
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
I
Igor Canadi 已提交
1128
  assert(!read_only_);
I
Igor Canadi 已提交
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
  Status s;
  unique_ptr<WritableFile> file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = backup_env_->NewWritableFile(GetLatestBackupFile(true),
                                   &file,
                                   env_options);
  if (!s.ok()) {
    backup_env_->DeleteFile(GetLatestBackupFile(true));
    return s;
  }

1141 1142
  unique_ptr<WritableFileWriter> file_writer(
      new WritableFileWriter(std::move(file), env_options));
1143
  char file_contents[10];
1144 1145
  int len =
      snprintf(file_contents, sizeof(file_contents), "%u\n", latest_backup);
1146
  s = file_writer->Append(Slice(file_contents, len));
I
Igor Canadi 已提交
1147
  if (s.ok() && options_.sync) {
1148
    file_writer->Sync(false);
I
Igor Canadi 已提交
1149 1150
  }
  if (s.ok()) {
1151
    s = file_writer->Close();
I
Igor Canadi 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160
  }
  if (s.ok()) {
    // atomically replace real file with new tmp
    s = backup_env_->RenameFile(GetLatestBackupFile(true),
                                GetLatestBackupFile(false));
  }
  return s;
}

L
Lei Jin 已提交
1161 1162 1163 1164 1165 1166 1167
Status BackupEngineImpl::CopyFile(
    const std::string& src,
    const std::string& dst, Env* src_env,
    Env* dst_env, bool sync,
    BackupRateLimiter* rate_limiter, uint64_t* size,
    uint32_t* checksum_value,
    uint64_t size_limit) {
I
Igor Canadi 已提交
1168 1169 1170 1171 1172
  Status s;
  unique_ptr<WritableFile> dst_file;
  unique_ptr<SequentialFile> src_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
1173
  env_options.use_os_buffer = false;
I
Igor Canadi 已提交
1174 1175 1176
  if (size != nullptr) {
    *size = 0;
  }
L
Lei Jin 已提交
1177 1178 1179
  if (checksum_value != nullptr) {
    *checksum_value = 0;
  }
I
Igor Canadi 已提交
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193

  // Check if size limit is set. if not, set it to very big number
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (s.ok()) {
    s = dst_env->NewWritableFile(dst, &dst_file, env_options);
  }
  if (!s.ok()) {
    return s;
  }

1194 1195 1196 1197
  unique_ptr<WritableFileWriter> dest_writer(
      new WritableFileWriter(std::move(dst_file), env_options));
  unique_ptr<SequentialFileReader> src_reader(
      new SequentialFileReader(std::move(src_file)));
1198 1199
  unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;
I
Igor Canadi 已提交
1200 1201

  do {
I
Igor Canadi 已提交
1202 1203 1204
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
I
Igor Canadi 已提交
1205 1206
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
1207
    s = src_reader->Read(buffer_to_read, &data, buf.get());
I
Igor Canadi 已提交
1208
    size_limit -= data.size();
L
Lei Jin 已提交
1209 1210 1211 1212 1213

    if (!s.ok()) {
      return s;
    }

I
Igor Canadi 已提交
1214 1215 1216
    if (size != nullptr) {
      *size += data.size();
    }
L
Lei Jin 已提交
1217 1218 1219
    if (checksum_value != nullptr) {
      *checksum_value = crc32c::Extend(*checksum_value, data.data(),
                                       data.size());
I
Igor Canadi 已提交
1220
    }
1221
    s = dest_writer->Append(data);
I
Igor Canadi 已提交
1222 1223 1224
    if (rate_limiter != nullptr) {
      rate_limiter->ReportAndWait(data.size());
    }
I
Igor Canadi 已提交
1225 1226 1227
  } while (s.ok() && data.size() > 0 && size_limit > 0);

  if (s.ok() && sync) {
1228
    s = dest_writer->Sync(false);
I
Igor Canadi 已提交
1229 1230 1231 1232 1233 1234
  }

  return s;
}

// src_fname will always start with "/"
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
Status BackupEngineImpl::AddBackupFileWorkItem(
        std::unordered_set<std::string>& live_dst_paths,
        std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
        BackupID backup_id,
        bool shared,
        const std::string& src_dir,
        const std::string& src_fname,
        BackupRateLimiter* rate_limiter,
        uint64_t size_limit,
        bool shared_checksum) {
I
Igor Canadi 已提交
1245 1246
  assert(src_fname.size() > 0 && src_fname[0] == '/');
  std::string dst_relative = src_fname.substr(1);
I
Igor Canadi 已提交
1247
  std::string dst_relative_tmp;
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268
  Status s;
  uint64_t size;
  uint32_t checksum_value = 0;

  if (shared && shared_checksum) {
    // add checksum and file length to the file name
    s = CalculateChecksum(src_dir + src_fname,
                          db_env_,
                          size_limit,
                          &checksum_value);
    if (s.ok()) {
        s = db_env_->GetFileSize(src_dir + src_fname, &size);
    }
    if (!s.ok()) {
         return s;
    }
    dst_relative = GetSharedFileWithChecksum(dst_relative, checksum_value,
                                             size);
    dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
    dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
  } else if (shared) {
I
Igor Canadi 已提交
1269 1270
    dst_relative_tmp = GetSharedFileRel(dst_relative, true);
    dst_relative = GetSharedFileRel(dst_relative, false);
I
Igor Canadi 已提交
1271
  } else {
I
Igor Canadi 已提交
1272 1273
    dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative);
    dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
I
Igor Canadi 已提交
1274 1275
  }
  std::string dst_path = GetAbsolutePath(dst_relative);
I
Igor Canadi 已提交
1276
  std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp);
I
Igor Canadi 已提交
1277

1278 1279
  // if it's shared, we also need to check if it exists -- if it does, no need
  // to copy it again.
I
Igor Canadi 已提交
1280
  bool need_to_copy = true;
1281 1282 1283
  // true if dst_path is the same path as another live file
  const bool same_path =
      live_dst_paths.find(dst_path) != live_dst_paths.end();
A
agiardullo 已提交
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298

  bool file_exists = false;
  if (shared && !same_path) {
    Status exist = backup_env_->FileExists(dst_path);
    if (exist.ok()) {
      file_exists = true;
    } else if (exist.IsNotFound()) {
      file_exists = false;
    } else {
      assert(s.IsIOError());
      return exist;
    }
  }

  if (shared && (same_path || file_exists)) {
I
Igor Canadi 已提交
1299
    need_to_copy = false;
1300 1301 1302 1303
    if (shared_checksum) {
      Log(options_.info_log,
          "%s already present, with checksum %u and size %" PRIu64,
          src_fname.c_str(), checksum_value, size);
I
Igor Canadi 已提交
1304
    } else if (backuped_file_infos_.find(dst_relative) ==
1305
               backuped_file_infos_.end() && !same_path) {
I
Igor Canadi 已提交
1306 1307 1308 1309 1310 1311 1312 1313
      // file already exists, but it's not referenced by any backup. overwrite
      // the file
      Log(options_.info_log,
          "%s already present, but not referenced by any backup. We will "
          "overwrite the file.",
          src_fname.c_str());
      need_to_copy = true;
      backup_env_->DeleteFile(dst_path);
1314
    } else {
I
Igor Canadi 已提交
1315 1316
      // the file is present and referenced by a backup
      db_env_->GetFileSize(src_dir + src_fname, &size);  // Ignore error
1317 1318
      Log(options_.info_log, "%s already present, calculate checksum",
          src_fname.c_str());
I
Igor Canadi 已提交
1319
      s = CalculateChecksum(src_dir + src_fname, db_env_, size_limit,
1320 1321
                            &checksum_value);
    }
I
Igor Canadi 已提交
1322
  }
1323 1324
  live_dst_paths.insert(dst_path);

I
Igor Canadi 已提交
1325
  if (need_to_copy) {
I
Igor Canadi 已提交
1326
    Log(options_.info_log, "Copying %s to %s", src_fname.c_str(),
1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360
            dst_path_tmp.c_str());
    CopyWorkItem copy_work_item(src_dir + src_fname,
                                dst_path_tmp,
                                db_env_,
                                backup_env_,
                                options_.sync,
                                rate_limiter,
                                size_limit);
    BackupAfterCopyWorkItem after_copy_work_item(
            copy_work_item.result.get_future(),
            shared,
            need_to_copy,
            backup_env_,
            dst_path_tmp,
            dst_path,
            dst_relative);
    files_to_copy_.write(std::move(copy_work_item));
    backup_items_to_finish.push_back(std::move(after_copy_work_item));
  } else {
    std::promise<CopyResult> promise_result;
    BackupAfterCopyWorkItem after_copy_work_item(
            promise_result.get_future(),
            shared,
            need_to_copy,
            backup_env_,
            dst_path_tmp,
            dst_path,
            dst_relative);
    backup_items_to_finish.push_back(std::move(after_copy_work_item));
    CopyResult result;
    result.status = s;
    result.size = size;
    result.checksum_value = checksum_value;
    promise_result.set_value(std::move(result));
L
Lei Jin 已提交
1361 1362 1363 1364
  }
  return s;
}

I
Igor Canadi 已提交
1365 1366 1367
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
                                           uint64_t size_limit,
                                           uint32_t* checksum_value) {
L
Lei Jin 已提交
1368 1369 1370 1371 1372 1373 1374
  *checksum_value = 0;
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
1375
  env_options.use_os_buffer = false;
L
Lei Jin 已提交
1376 1377 1378 1379 1380

  std::unique_ptr<SequentialFile> src_file;
  Status s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (!s.ok()) {
    return s;
I
Igor Canadi 已提交
1381
  }
L
Lei Jin 已提交
1382

1383 1384
  unique_ptr<SequentialFileReader> src_reader(
      new SequentialFileReader(std::move(src_file)));
L
Lei Jin 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393
  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;

  do {
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
1394
    s = src_reader->Read(buffer_to_read, &data, buf.get());
L
Lei Jin 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403

    if (!s.ok()) {
      return s;
    }

    size_limit -= data.size();
    *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
  } while (data.size() > 0 && size_limit > 0);

I
Igor Canadi 已提交
1404 1405 1406
  return s;
}

1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423
void BackupEngineImpl::DeleteChildren(const std::string& dir,
                                      uint32_t file_type_filter) {
  std::vector<std::string> children;
  db_env_->GetChildren(dir, &children);  // ignore errors

  for (const auto& f : children) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(f, &number, &type);
    if (ok && (file_type_filter & (1 << type))) {
      // don't delete this file
      continue;
    }
    db_env_->DeleteFile(dir + "/" + f);  // ignore errors
  }
}

H
Hasnain Lakhani 已提交
1424
Status BackupEngineImpl::GarbageCollect() {
I
Igor Canadi 已提交
1425
  assert(!read_only_);
I
Igor Canadi 已提交
1426
  Log(options_.info_log, "Starting garbage collection");
H
Hasnain Lakhani 已提交
1427 1428 1429

  // delete obsolete shared files
  std::vector<std::string> shared_children;
1430 1431 1432 1433 1434 1435 1436
  {
    auto s = backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
                                      &shared_children);
    if (!s.ok()) {
      return s;
    }
  }
H
Hasnain Lakhani 已提交
1437 1438
  for (auto& child : shared_children) {
    std::string rel_fname = GetSharedFileRel(child);
I
Igor Canadi 已提交
1439
    auto child_itr = backuped_file_infos_.find(rel_fname);
H
Hasnain Lakhani 已提交
1440
    // if it's not refcounted, delete it
I
Igor Canadi 已提交
1441 1442
    if (child_itr == backuped_file_infos_.end() ||
        child_itr->second->refs == 0) {
H
Hasnain Lakhani 已提交
1443 1444 1445
      // this might be a directory, but DeleteFile will just fail in that
      // case, so we're good
      Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1446 1447
      Log(options_.info_log, "Deleting %s -- %s", rel_fname.c_str(),
          s.ToString().c_str());
I
Igor Canadi 已提交
1448
      backuped_file_infos_.erase(rel_fname);
I
Igor Canadi 已提交
1449
    }
H
Hasnain Lakhani 已提交
1450
  }
I
Igor Canadi 已提交
1451

H
Hasnain Lakhani 已提交
1452 1453
  // delete obsolete private files
  std::vector<std::string> private_children;
1454 1455 1456 1457 1458 1459 1460
  {
    auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
                                      &private_children);
    if (!s.ok()) {
      return s;
    }
  }
H
Hasnain Lakhani 已提交
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476
  for (auto& child : private_children) {
    BackupID backup_id = 0;
    bool tmp_dir = child.find(".tmp") != std::string::npos;
    sscanf(child.c_str(), "%u", &backup_id);
    if (!tmp_dir &&  // if it's tmp_dir, delete it
        (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
      // it's either not a number or it's still alive. continue
      continue;
    }
    // here we have to delete the dir and all its children
    std::string full_private_path =
        GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir));
    std::vector<std::string> subchildren;
    backup_env_->GetChildren(full_private_path, &subchildren);
    for (auto& subchild : subchildren) {
      Status s = backup_env_->DeleteFile(full_private_path + subchild);
1477 1478
      Log(options_.info_log, "Deleting %s -- %s",
          (full_private_path + subchild).c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
1479
    }
H
Hasnain Lakhani 已提交
1480 1481
    // finally delete the private dir
    Status s = backup_env_->DeleteDir(full_private_path);
1482
    Log(options_.info_log, "Deleting dir %s -- %s", full_private_path.c_str(),
H
Hasnain Lakhani 已提交
1483
        s.ToString().c_str());
I
Igor Canadi 已提交
1484
  }
H
Hasnain Lakhani 已提交
1485 1486

  return Status::OK();
I
Igor Canadi 已提交
1487 1488 1489 1490
}

// ------- BackupMeta class --------

1491 1492 1493
Status BackupEngineImpl::BackupMeta::AddFile(
    std::shared_ptr<FileInfo> file_info) {
  auto itr = file_infos_->find(file_info->filename);
L
Lei Jin 已提交
1494
  if (itr == file_infos_->end()) {
1495
    auto ret = file_infos_->insert({file_info->filename, file_info});
L
Lei Jin 已提交
1496
    if (ret.second) {
1497 1498
      itr = ret.first;
      itr->second->refs = 1;
L
Lei Jin 已提交
1499 1500 1501 1502
    } else {
      // if this happens, something is seriously wrong
      return Status::Corruption("In memory metadata insertion error");
    }
I
Igor Canadi 已提交
1503
  } else {
1504
    if (itr->second->checksum_value != file_info->checksum_value) {
I
Igor Canadi 已提交
1505 1506 1507
      return Status::Corruption(
          "Checksum mismatch for existing backup file. Delete old backups and "
          "try again.");
L
Lei Jin 已提交
1508
    }
1509
    ++itr->second->refs;  // increase refcount if already present
I
Igor Canadi 已提交
1510
  }
L
Lei Jin 已提交
1511

1512 1513 1514
  size_ += file_info->size;
  files_.push_back(itr->second);

L
Lei Jin 已提交
1515
  return Status::OK();
I
Igor Canadi 已提交
1516 1517
}

1518 1519
Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
  Status s;
L
Lei Jin 已提交
1520
  for (const auto& file : files_) {
1521
    --file->refs;  // decrease refcount
I
Igor Canadi 已提交
1522 1523 1524
  }
  files_.clear();
  // delete meta file
A
agiardullo 已提交
1525 1526 1527 1528 1529 1530 1531
  if (delete_meta) {
    s = env_->FileExists(meta_filename_);
    if (s.ok()) {
      s = env_->DeleteFile(meta_filename_);
    } else if (s.IsNotFound()) {
      s = Status::OK();  // nothing to delete
    }
I
Igor Canadi 已提交
1532
  }
I
Igor Canadi 已提交
1533
  timestamp_ = 0;
1534
  return s;
I
Igor Canadi 已提交
1535 1536 1537 1538
}

// each backup meta file is of the format:
// <timestamp>
1539
// <seq number>
I
Igor Canadi 已提交
1540
// <number of files>
L
Lei Jin 已提交
1541 1542
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
I
Igor Canadi 已提交
1543
// ...
I
Igor Canadi 已提交
1544 1545
Status BackupEngineImpl::BackupMeta::LoadFromFile(
    const std::string& backup_dir) {
I
Igor Canadi 已提交
1546 1547 1548 1549 1550 1551 1552 1553
  assert(Empty());
  Status s;
  unique_ptr<SequentialFile> backup_meta_file;
  s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  if (!s.ok()) {
    return s;
  }

1554 1555
  unique_ptr<SequentialFileReader> backup_meta_reader(
      new SequentialFileReader(std::move(backup_meta_file)));
1556 1557
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
  Slice data;
1558
  s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
I
Igor Canadi 已提交
1559 1560

  if (!s.ok() || data.size() == max_backup_meta_file_size_) {
L
Lei Jin 已提交
1561
    return s.ok() ? Status::Corruption("File size too big") : s;
I
Igor Canadi 已提交
1562 1563 1564 1565
  }
  buf[data.size()] = 0;

  uint32_t num_files = 0;
1566 1567 1568 1569 1570
  char *next;
  timestamp_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  sequence_number_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1571
  num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1572
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
I
Igor Canadi 已提交
1573

1574
  std::vector<std::shared_ptr<FileInfo>> files;
1575

1576 1577
  Slice checksum_prefix("crc32 ");

I
Igor Canadi 已提交
1578
  for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
L
Lei Jin 已提交
1579 1580 1581
    auto line = GetSliceUntil(&data, '\n');
    std::string filename = GetSliceUntil(&line, ' ').ToString();

I
Igor Canadi 已提交
1582
    uint64_t size;
1583 1584
    const std::shared_ptr<FileInfo> file_info = GetFile(filename);
    if (file_info) {
1585 1586 1587 1588 1589 1590
      size = file_info->size;
    } else {
      s = env_->GetFileSize(backup_dir + "/" + filename, &size);
      if (!s.ok()) {
        return s;
      }
I
Igor Canadi 已提交
1591
    }
L
Lei Jin 已提交
1592 1593

    if (line.empty()) {
I
Igor Canadi 已提交
1594 1595
      return Status::Corruption("File checksum is missing for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
1596 1597 1598
    }

    uint32_t checksum_value = 0;
1599 1600
    if (line.starts_with(checksum_prefix)) {
      line.remove_prefix(checksum_prefix.size());
1601 1602
      checksum_value = static_cast<uint32_t>(
          strtoul(line.data(), nullptr, 10));
S
sdong 已提交
1603
      if (line != rocksdb::ToString(checksum_value)) {
I
Igor Canadi 已提交
1604 1605
        return Status::Corruption("Invalid checksum value for " + filename +
                                  " in " + meta_filename_);
L
Lei Jin 已提交
1606 1607
      }
    } else {
I
Igor Canadi 已提交
1608 1609
      return Status::Corruption("Unknown checksum type for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
1610 1611
    }

1612
    files.emplace_back(new FileInfo(filename, size, checksum_value));
1613 1614
  }

I
Igor Canadi 已提交
1615 1616
  if (s.ok() && data.size() > 0) {
    // file has to be read completely. if not, we count it as corruption
I
Igor Canadi 已提交
1617 1618
    s = Status::Corruption("Tailing data in backup meta file in " +
                           meta_filename_);
I
Igor Canadi 已提交
1619 1620
  }

1621
  if (s.ok()) {
1622
    files_.reserve(files.size());
L
Lei Jin 已提交
1623 1624 1625 1626 1627
    for (const auto& file_info : files) {
      s = AddFile(file_info);
      if (!s.ok()) {
        break;
      }
1628
    }
I
Igor Canadi 已提交
1629 1630 1631 1632 1633
  }

  return s;
}

I
Igor Canadi 已提交
1634
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
I
Igor Canadi 已提交
1635 1636 1637 1638 1639 1640 1641 1642 1643 1644
  Status s;
  unique_ptr<WritableFile> backup_meta_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file,
                            env_options);
  if (!s.ok()) {
    return s;
  }

1645
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
I
Igor Canadi 已提交
1646
  int len = 0, buf_size = max_backup_meta_file_size_;
1647
  len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1648 1649
  len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
                  sequence_number_);
S
sdong 已提交
1650 1651
  len += snprintf(buf.get() + len, buf_size - len, "%" ROCKSDB_PRIszt "\n",
                  files_.size());
L
Lei Jin 已提交
1652 1653 1654
  for (const auto& file : files_) {
    // use crc32 for now, switch to something else if needed
    len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
1655
                    file->filename.c_str(), file->checksum_value);
I
Igor Canadi 已提交
1656 1657
  }

1658
  s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
I
Igor Canadi 已提交
1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
  if (s.ok() && sync) {
    s = backup_meta_file->Sync();
  }
  if (s.ok()) {
    s = backup_meta_file->Close();
  }
  if (s.ok()) {
    s = env_->RenameFile(meta_filename_ + ".tmp", meta_filename_);
  }
  return s;
}

I
Igor Canadi 已提交
1671 1672 1673
// -------- BackupEngineReadOnlyImpl ---------
class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
 public:
I
Igor Canadi 已提交
1674 1675 1676
  BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
      : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}

I
Igor Canadi 已提交
1677 1678
  virtual ~BackupEngineReadOnlyImpl() {}

I
Igor Sugak 已提交
1679
  virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
I
Igor Canadi 已提交
1680 1681 1682
    backup_engine_->GetBackupInfo(backup_info);
  }

I
Igor Sugak 已提交
1683 1684
  virtual void GetCorruptedBackups(
      std::vector<BackupID>* corrupt_backup_ids) override {
H
Hasnain Lakhani 已提交
1685 1686 1687
    backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
  }

I
Igor Canadi 已提交
1688 1689
  virtual Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
I
Igor Sugak 已提交
1690
      const RestoreOptions& restore_options = RestoreOptions()) override {
I
Igor Canadi 已提交
1691 1692 1693 1694 1695 1696
    return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                               restore_options);
  }

  virtual Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
I
Igor Sugak 已提交
1697
      const RestoreOptions& restore_options = RestoreOptions()) override {
I
Igor Canadi 已提交
1698 1699 1700 1701
    return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                     restore_options);
  }

1702 1703
  Status Initialize() { return backup_engine_->Initialize(); }

I
Igor Canadi 已提交
1704
 private:
I
Igor Canadi 已提交
1705
  std::unique_ptr<BackupEngineImpl> backup_engine_;
I
Igor Canadi 已提交
1706 1707
};

I
Igor Canadi 已提交
1708 1709 1710 1711 1712 1713
Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
                                  BackupEngineReadOnly** backup_engine_ptr) {
  if (options.destroy_old_data) {
    return Status::InvalidArgument(
        "Can't destroy old data with ReadOnly BackupEngine");
  }
1714 1715 1716 1717 1718 1719 1720 1721
  std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
      new BackupEngineReadOnlyImpl(env, options));
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
I
Igor Canadi 已提交
1722 1723 1724
  return Status::OK();
}

I
Igor Canadi 已提交
1725 1726
// --- BackupableDB methods --------

1727
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
1728 1729 1730 1731 1732
    : StackableDB(db) {
  auto backup_engine_impl = new BackupEngineImpl(db->GetEnv(), options);
  status_ = backup_engine_impl->Initialize();
  backup_engine_ = backup_engine_impl;
}
I
Igor Canadi 已提交
1733 1734 1735 1736 1737 1738

BackupableDB::~BackupableDB() {
  delete backup_engine_;
}

Status BackupableDB::CreateNewBackup(bool flush_before_backup) {
1739 1740 1741
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1742 1743 1744 1745
  return backup_engine_->CreateNewBackup(this, flush_before_backup);
}

void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1746 1747 1748
  if (!status_.ok()) {
    return;
  }
I
Igor Canadi 已提交
1749 1750 1751
  backup_engine_->GetBackupInfo(backup_info);
}

H
Hasnain Lakhani 已提交
1752 1753
void
BackupableDB::GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) {
1754 1755 1756
  if (!status_.ok()) {
    return;
  }
H
Hasnain Lakhani 已提交
1757 1758 1759
  backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}

I
Igor Canadi 已提交
1760
Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
1761 1762 1763
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1764 1765 1766 1767
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status BackupableDB::DeleteBackup(BackupID backup_id) {
1768 1769 1770
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1771 1772 1773
  return backup_engine_->DeleteBackup(backup_id);
}

I
Igor Canadi 已提交
1774 1775 1776 1777
void BackupableDB::StopBackup() {
  backup_engine_->StopBackup();
}

H
Hasnain Lakhani 已提交
1778
Status BackupableDB::GarbageCollect() {
1779 1780 1781
  if (!status_.ok()) {
    return status_;
  }
H
Hasnain Lakhani 已提交
1782 1783 1784
  return backup_engine_->GarbageCollect();
}

I
Igor Canadi 已提交
1785 1786 1787
// --- RestoreBackupableDB methods ------

RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
1788 1789 1790 1791 1792
                                         const BackupableDBOptions& options) {
  auto backup_engine_impl = new BackupEngineImpl(db_env, options);
  status_ = backup_engine_impl->Initialize();
  backup_engine_ = backup_engine_impl;
}
I
Igor Canadi 已提交
1793 1794 1795 1796 1797 1798 1799

RestoreBackupableDB::~RestoreBackupableDB() {
  delete backup_engine_;
}

void
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1800 1801 1802
  if (!status_.ok()) {
    return;
  }
I
Igor Canadi 已提交
1803 1804 1805
  backup_engine_->GetBackupInfo(backup_info);
}

H
Hasnain Lakhani 已提交
1806 1807
void RestoreBackupableDB::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
1808 1809 1810
  if (!status_.ok()) {
    return;
  }
H
Hasnain Lakhani 已提交
1811 1812 1813
  backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}

1814 1815 1816
Status RestoreBackupableDB::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
1817 1818 1819
  if (!status_.ok()) {
    return status_;
  }
1820 1821
  return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                             restore_options);
I
Igor Canadi 已提交
1822 1823
}

1824 1825 1826
Status RestoreBackupableDB::RestoreDBFromLatestBackup(
    const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
1827 1828 1829
  if (!status_.ok()) {
    return status_;
  }
1830 1831
  return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                   restore_options);
I
Igor Canadi 已提交
1832 1833 1834
}

Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
1835 1836 1837
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1838 1839 1840 1841
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) {
1842 1843 1844
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1845 1846 1847
  return backup_engine_->DeleteBackup(backup_id);
}

H
Hasnain Lakhani 已提交
1848
Status RestoreBackupableDB::GarbageCollect() {
1849 1850 1851
  if (!status_.ok()) {
    return status_;
  }
H
Hasnain Lakhani 已提交
1852 1853 1854
  return backup_engine_->GarbageCollect();
}

I
Igor Canadi 已提交
1855
}  // namespace rocksdb
I
Igor Canadi 已提交
1856 1857

#endif  // ROCKSDB_LITE