backupable_db.cc 60.3 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10 11
#ifndef ROCKSDB_LITE

12
#include "rocksdb/utilities/backupable_db.h"
I
Igor Canadi 已提交
13
#include "db/filename.h"
14
#include "util/channel.h"
I
Igor Canadi 已提交
15
#include "util/coding.h"
L
Lei Jin 已提交
16
#include "util/crc32c.h"
17
#include "util/file_reader_writer.h"
I
Igor Canadi 已提交
18
#include "util/logging.h"
S
sdong 已提交
19
#include "util/string_util.h"
I
Igor Canadi 已提交
20
#include "rocksdb/transaction_log.h"
21
#include "port/port.h"
I
Igor Canadi 已提交
22

L
liuhuahang 已提交
23
#ifndef __STDC_FORMAT_MACROS
I
Igor Canadi 已提交
24
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
25
#endif
I
Igor Canadi 已提交
26 27

#include <inttypes.h>
28
#include <stdlib.h>
I
Igor Canadi 已提交
29 30 31
#include <algorithm>
#include <vector>
#include <map>
32
#include <mutex>
I
Igor Canadi 已提交
33
#include <sstream>
I
Igor Canadi 已提交
34 35
#include <string>
#include <limits>
I
Igor Canadi 已提交
36
#include <atomic>
37 38
#include <future>
#include <thread>
39
#include <unordered_map>
40
#include <unordered_set>
41 42
#include "port/port.h"

I
Igor Canadi 已提交
43 44 45

namespace rocksdb {

L
Lei Jin 已提交
46
class BackupRateLimiter {
I
Igor Canadi 已提交
47
 public:
L
Lei Jin 已提交
48 49
  BackupRateLimiter(Env* env, uint64_t max_bytes_per_second,
                   uint64_t bytes_per_check)
I
Igor Canadi 已提交
50 51 52 53 54 55
      : env_(env),
        max_bytes_per_second_(max_bytes_per_second),
        bytes_per_check_(bytes_per_check),
        micros_start_time_(env->NowMicros()),
        bytes_since_start_(0) {}

56
  // thread safe
I
Igor Canadi 已提交
57
  void ReportAndWait(uint64_t bytes_since_last_call) {
58 59
    std::unique_lock<std::mutex> lk(lock_);

I
Igor Canadi 已提交
60 61 62 63 64 65 66 67 68 69 70 71
    bytes_since_start_ += bytes_since_last_call;
    if (bytes_since_start_ < bytes_per_check_) {
      // not enough bytes to be rate-limited
      return;
    }

    uint64_t now = env_->NowMicros();
    uint64_t interval = now - micros_start_time_;
    uint64_t should_take_micros =
        (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_;

    if (should_take_micros > interval) {
72 73
      env_->SleepForMicroseconds(
          static_cast<int>(should_take_micros - interval));
I
Igor Canadi 已提交
74 75 76 77 78 79 80 81 82
      now = env_->NowMicros();
    }
    // reset interval
    micros_start_time_ = now;
    bytes_since_start_ = 0;
  }

 private:
  Env* env_;
83
  std::mutex lock_;
I
Igor Canadi 已提交
84 85 86 87 88 89 90
  uint64_t max_bytes_per_second_;
  uint64_t bytes_per_check_;
  uint64_t micros_start_time_;
  uint64_t bytes_since_start_;
  static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
void BackupStatistics::IncrementNumberSuccessBackup() {
  number_success_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() {
  number_fail_backup++;
}

uint32_t BackupStatistics::GetNumberSuccessBackup() const {
  return number_success_backup;
}
uint32_t BackupStatistics::GetNumberFailBackup() const {
  return number_fail_backup;
}

std::string BackupStatistics::ToString() const {
  char result[50];
  snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
           GetNumberSuccessBackup(), GetNumberFailBackup());
  return result;
}

I
Igor Canadi 已提交
112
void BackupableDBOptions::Dump(Logger* logger) const {
113 114 115
  Log(logger, "               Options.backup_dir: %s", backup_dir.c_str());
  Log(logger, "               Options.backup_env: %p", backup_env);
  Log(logger, "        Options.share_table_files: %d",
I
Igor Canadi 已提交
116
      static_cast<int>(share_table_files));
117 118 119
  Log(logger, "                 Options.info_log: %p", info_log);
  Log(logger, "                     Options.sync: %d", static_cast<int>(sync));
  Log(logger, "         Options.destroy_old_data: %d",
I
Igor Canadi 已提交
120
      static_cast<int>(destroy_old_data));
121
  Log(logger, "         Options.backup_log_files: %d",
122
      static_cast<int>(backup_log_files));
123 124 125 126 127
  Log(logger, "        Options.backup_rate_limit: %" PRIu64, backup_rate_limit);
  Log(logger, "       Options.restore_rate_limit: %" PRIu64,
      restore_rate_limit);
  Log(logger, "Options.max_background_operations: %d",
      max_background_operations);
I
Igor Canadi 已提交
128 129
}

I
Igor Canadi 已提交
130 131
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
I
Igor Canadi 已提交
132
 public:
I
Igor Canadi 已提交
133 134
  BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
                   bool read_only = false);
I
Igor Canadi 已提交
135
  ~BackupEngineImpl();
I
Igor Sugak 已提交
136 137 138 139
  Status CreateNewBackup(DB* db, bool flush_before_backup = false) override;
  Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
  Status DeleteBackup(BackupID backup_id) override;
  void StopBackup() override {
I
Igor Canadi 已提交
140 141
    stop_backup_.store(true, std::memory_order_release);
  }
I
Igor Sugak 已提交
142 143 144 145 146 147 148 149 150 151
  Status GarbageCollect() override;

  void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
  void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
  Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override;
  Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override {
152 153
    return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir,
                               restore_options);
I
Igor Canadi 已提交
154 155
  }

156 157
  Status Initialize();

I
Igor Canadi 已提交
158
 private:
159 160
  void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);

L
Lei Jin 已提交
161 162 163 164
  struct FileInfo {
    FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
      : refs(0), filename(fname), size(sz), checksum_value(checksum) {}

165 166 167
    FileInfo(const FileInfo&) = delete;
    FileInfo& operator=(const FileInfo&) = delete;

L
Lei Jin 已提交
168 169 170
    int refs;
    const std::string filename;
    const uint64_t size;
171
    const uint32_t checksum_value;
L
Lei Jin 已提交
172 173
  };

I
Igor Canadi 已提交
174 175 176
  class BackupMeta {
   public:
    BackupMeta(const std::string& meta_filename,
177 178
        std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
        Env* env)
I
Igor Canadi 已提交
179
      : timestamp_(0), size_(0), meta_filename_(meta_filename),
L
Lei Jin 已提交
180
        file_infos_(file_infos), env_(env) {}
I
Igor Canadi 已提交
181

182 183 184
    BackupMeta(const BackupMeta&) = delete;
    BackupMeta& operator=(const BackupMeta&) = delete;

I
Igor Canadi 已提交
185 186 187 188 189 190 191 192 193 194 195
    ~BackupMeta() {}

    void RecordTimestamp() {
      env_->GetCurrentTime(&timestamp_);
    }
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
196
    uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
197 198 199 200 201 202
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
I
Igor Canadi 已提交
203

204
    Status AddFile(std::shared_ptr<FileInfo> file_info);
L
Lei Jin 已提交
205

206
    Status Delete(bool delete_meta = true);
I
Igor Canadi 已提交
207 208 209 210 211

    bool Empty() {
      return files_.empty();
    }

212
    std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
213 214 215
      auto it = file_infos_->find(filename);
      if (it == file_infos_->end())
        return nullptr;
216
      return it->second;
217 218
    }

219
    const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
I
Igor Canadi 已提交
220 221 222 223 224 225
      return files_;
    }

    Status LoadFromFile(const std::string& backup_dir);
    Status StoreToFile(bool sync);

I
Igor Canadi 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
    std::string GetInfoString() {
      std::ostringstream ss;
      ss << "Timestamp: " << timestamp_ << std::endl;
      char human_size[16];
      AppendHumanBytes(size_, human_size, sizeof(human_size));
      ss << "Size: " << human_size << std::endl;
      ss << "Files:" << std::endl;
      for (const auto& file : files_) {
        AppendHumanBytes(file->size, human_size, sizeof(human_size));
        ss << file->filename << ", size " << human_size << ", refs "
           << file->refs << std::endl;
      }
      return ss.str();
    }

I
Igor Canadi 已提交
241 242
   private:
    int64_t timestamp_;
243 244 245
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
I
Igor Canadi 已提交
246 247 248
    uint64_t size_;
    std::string const meta_filename_;
    // files with relative paths (without "/" prefix!!)
249 250
    std::vector<std::shared_ptr<FileInfo>> files_;
    std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
I
Igor Canadi 已提交
251
    Env* env_;
252

253 254
    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
  };  // BackupMeta
I
Igor Canadi 已提交
255 256 257 258 259 260 261 262 263

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
264 265 266
  inline std::string GetSharedChecksumDirRel() const {
    return "shared_checksum";
  }
I
Igor Canadi 已提交
267
  inline std::string GetPrivateFileRel(BackupID backup_id,
I
Igor Canadi 已提交
268 269
                                       bool tmp = false,
                                       const std::string& file = "") const {
I
Igor Canadi 已提交
270
    assert(file.size() == 0 || file[0] != '/');
S
sdong 已提交
271
    return GetPrivateDirRel() + "/" + rocksdb::ToString(backup_id) +
I
Igor Canadi 已提交
272
           (tmp ? ".tmp" : "") + "/" + file;
I
Igor Canadi 已提交
273
  }
I
Igor Canadi 已提交
274 275
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
I
Igor Canadi 已提交
276
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
277
    return "shared/" + file + (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
278
  }
279 280 281 282 283 284 285 286 287 288 289
  inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
                                                  bool tmp = false) const {
    assert(file.size() == 0 || file[0] != '/');
    return GetSharedChecksumDirRel() + "/" + file + (tmp ? ".tmp" : "");
  }
  inline std::string GetSharedFileWithChecksum(const std::string& file,
                                               const uint32_t checksum_value,
                                               const uint64_t file_size) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    return file_copy.insert(file_copy.find_last_of('.'),
S
sdong 已提交
290 291
                            "_" + rocksdb::ToString(checksum_value) + "_" +
                                rocksdb::ToString(file_size));
292 293 294 295 296 297 298 299
  }
  inline std::string GetFileFromChecksumFile(const std::string& file) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    size_t first_underscore = file_copy.find_first_of('_');
    return file_copy.erase(first_underscore,
                           file_copy.find_last_of('.') - first_underscore);
  }
I
Igor Canadi 已提交
300 301 302 303 304 305 306
  inline std::string GetLatestBackupFile(bool tmp = false) const {
    return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : ""));
  }
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
  inline std::string GetBackupMetaFile(BackupID backup_id) const {
S
sdong 已提交
307
    return GetBackupMetaDir() + "/" + rocksdb::ToString(backup_id);
I
Igor Canadi 已提交
308 309 310 311 312 313 314 315 316 317
  }

  Status GetLatestBackupFileContents(uint32_t* latest_backup);
  Status PutLatestBackupFileContents(uint32_t latest_backup);
  // if size_limit == 0, there is no size limit, copy everything
  Status CopyFile(const std::string& src,
                  const std::string& dst,
                  Env* src_env,
                  Env* dst_env,
                  bool sync,
L
Lei Jin 已提交
318
                  BackupRateLimiter* rate_limiter,
I
Igor Canadi 已提交
319
                  uint64_t* size = nullptr,
L
Lei Jin 已提交
320
                  uint32_t* checksum_value = nullptr,
I
Igor Canadi 已提交
321
                  uint64_t size_limit = 0);
L
Lei Jin 已提交
322 323 324 325 326 327

  Status CalculateChecksum(const std::string& src,
                           Env* src_env,
                           uint64_t size_limit,
                           uint32_t* checksum_value);

328 329 330 331 332 333 334 335 336 337 338 339 340 341
  struct CopyResult {
    uint64_t size;
    uint32_t checksum_value;
    Status status;
  };
  struct CopyWorkItem {
    std::string src_path;
    std::string dst_path;
    Env* src_env;
    Env* dst_env;
    bool sync;
    BackupRateLimiter* rate_limiter;
    uint64_t size_limit;
    std::promise<CopyResult> result;
342

343
    CopyWorkItem() {}
344 345 346
    CopyWorkItem(const CopyWorkItem&) = delete;
    CopyWorkItem& operator=(const CopyWorkItem&) = delete;

D
Dmitri Smirnov 已提交
347
    CopyWorkItem(CopyWorkItem&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); }
348

D
Dmitri Smirnov 已提交
349
    CopyWorkItem& operator=(CopyWorkItem&& o) ROCKSDB_NOEXCEPT {
350 351 352 353 354 355 356 357 358 359 360
      src_path = std::move(o.src_path);
      dst_path = std::move(o.dst_path);
      src_env = o.src_env;
      dst_env = o.dst_env;
      sync = o.sync;
      rate_limiter = o.rate_limiter;
      size_limit = o.size_limit;
      result = std::move(o.result);
      return *this;
    }

361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
    CopyWorkItem(std::string _src_path,
                 std::string _dst_path,
                 Env* _src_env,
                 Env* _dst_env,
                 bool _sync,
                 BackupRateLimiter* _rate_limiter,
                 uint64_t _size_limit)
        : src_path(std::move(_src_path)),
          dst_path(std::move(_dst_path)),
          src_env(_src_env),
          dst_env(_dst_env),
          sync(_sync),
          rate_limiter(_rate_limiter),
          size_limit(_size_limit) {}
  };

  struct BackupAfterCopyWorkItem {
    std::future<CopyResult> result;
    bool shared;
    bool needed_to_copy;
    Env* backup_env;
    std::string dst_path_tmp;
    std::string dst_path;
    std::string dst_relative;
    BackupAfterCopyWorkItem() {}
386

D
Dmitri Smirnov 已提交
387
    BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT {
388 389 390
      *this = std::move(o);
    }

D
Dmitri Smirnov 已提交
391
    BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT {
392 393 394 395 396 397 398 399 400 401
      result = std::move(o.result);
      shared = o.shared;
      needed_to_copy = o.needed_to_copy;
      backup_env = o.backup_env;
      dst_path_tmp = std::move(o.dst_path_tmp);
      dst_path = std::move(o.dst_path);
      dst_relative = std::move(o.dst_relative);
      return *this;
    }

S
sdong 已提交
402 403 404
    BackupAfterCopyWorkItem(std::future<CopyResult>&& _result, bool _shared,
                            bool _needed_to_copy, Env* _backup_env,
                            std::string _dst_path_tmp, std::string _dst_path,
405 406 407 408 409 410 411 412 413 414 415 416 417 418
                            std::string _dst_relative)
        : result(std::move(_result)),
          shared(_shared),
          needed_to_copy(_needed_to_copy),
          backup_env(_backup_env),
          dst_path_tmp(std::move(_dst_path_tmp)),
          dst_path(std::move(_dst_path)),
          dst_relative(std::move(_dst_relative)) {}
  };

  struct RestoreAfterCopyWorkItem {
    std::future<CopyResult> result;
    uint32_t checksum_value;
    RestoreAfterCopyWorkItem() {}
419
    RestoreAfterCopyWorkItem(std::future<CopyResult>&& _result,
420
                             uint32_t _checksum_value)
S
sdong 已提交
421
        : result(std::move(_result)), checksum_value(_checksum_value) {}
D
Dmitri Smirnov 已提交
422
    RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT {
423 424 425
      *this = std::move(o);
    }

D
Dmitri Smirnov 已提交
426
    RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT {
427 428 429 430
      result = std::move(o.result);
      checksum_value = o.checksum_value;
      return *this;
    }
431 432
  };

433
  bool initialized_;
434 435 436 437 438 439 440 441 442 443 444 445 446 447
  channel<CopyWorkItem> files_to_copy_;
  std::vector<std::thread> threads_;

  Status AddBackupFileWorkItem(
          std::unordered_set<std::string>& live_dst_paths,
          std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
          BackupID backup_id,
          bool shared,
          const std::string& src_dir,
          const std::string& src_fname,  // starts with "/"
          BackupRateLimiter* rate_limiter,
          uint64_t size_limit = 0,
          bool shared_checksum = false);

I
Igor Canadi 已提交
448 449
  // backup state data
  BackupID latest_backup_id_;
450 451 452 453 454
  std::map<BackupID, unique_ptr<BackupMeta>> backups_;
  std::map<BackupID,
           std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
  std::unordered_map<std::string,
                     std::shared_ptr<FileInfo>> backuped_file_infos_;
I
Igor Canadi 已提交
455
  std::atomic<bool> stop_backup_;
I
Igor Canadi 已提交
456 457 458 459 460 461

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

I
Igor Canadi 已提交
462 463 464 465 466 467
  // directories
  unique_ptr<Directory> backup_directory_;
  unique_ptr<Directory> shared_directory_;
  unique_ptr<Directory> meta_directory_;
  unique_ptr<Directory> private_directory_;

I
Igor Canadi 已提交
468 469
  static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
  size_t copy_file_buffer_size_;
I
Igor Canadi 已提交
470
  bool read_only_;
471
  BackupStatistics backup_statistics_;
I
Igor Canadi 已提交
472 473
};

474
Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
H
Hasnain Lakhani 已提交
475
                          BackupEngine** backup_engine_ptr) {
476 477 478 479 480 481 482 483
  std::unique_ptr<BackupEngineImpl> backup_engine(
      new BackupEngineImpl(env, options));
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
H
Hasnain Lakhani 已提交
484 485 486
  return Status::OK();
}

I
Igor Canadi 已提交
487
BackupEngineImpl::BackupEngineImpl(Env* db_env,
I
Igor Canadi 已提交
488 489
                                   const BackupableDBOptions& options,
                                   bool read_only)
490 491
    : initialized_(false),
      stop_backup_(false),
I
Igor Canadi 已提交
492 493
      options_(options),
      db_env_(db_env),
I
Igor Canadi 已提交
494
      backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
I
Igor Canadi 已提交
495
      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
496 497 498 499 500 501 502 503 504
      read_only_(read_only) {}

BackupEngineImpl::~BackupEngineImpl() {
  files_to_copy_.sendEof();
  for (auto& t : threads_) {
    t.join();
  }
  LogFlush(options_.info_log);
}
505

506 507 508
Status BackupEngineImpl::Initialize() {
  assert(!initialized_);
  initialized_ = true;
I
Igor Canadi 已提交
509 510 511
  if (read_only_) {
    Log(options_.info_log, "Starting read_only backup engine");
  }
I
Igor Canadi 已提交
512 513
  options_.Dump(options_.info_log);

I
Igor Canadi 已提交
514
  if (!read_only_) {
515 516 517 518
    // gather the list of directories that we need to create
    std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
        directories;
    directories.emplace_back(GetAbsolutePath(), &backup_directory_);
I
Igor Canadi 已提交
519
    if (options_.share_table_files) {
520
      if (options_.share_files_with_checksum) {
521 522 523
        directories.emplace_back(
            GetAbsolutePath(GetSharedFileWithChecksumRel()),
            &shared_directory_);
524
      } else {
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
        directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
                                 &shared_directory_);
      }
    }
    directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
                             &private_directory_);
    directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
    // create all the dirs we need
    for (const auto& d : directories) {
      auto s = backup_env_->CreateDirIfMissing(d.first);
      if (s.ok()) {
        s = backup_env_->NewDirectory(d.first, d.second);
      }
      if (!s.ok()) {
        return s;
540
      }
I
Igor Canadi 已提交
541
    }
I
Igor Canadi 已提交
542
  }
I
Igor Canadi 已提交
543 544

  std::vector<std::string> backup_meta_files;
545 546 547 548 549 550
  {
    auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
    if (!s.ok()) {
      return s;
    }
  }
I
Igor Canadi 已提交
551 552
  // create backups_ structure
  for (auto& file : backup_meta_files) {
I
Igor Canadi 已提交
553 554 555 556
    if (file == "." || file == "..") {
      continue;
    }
    Log(options_.info_log, "Detected backup %s", file.c_str());
I
Igor Canadi 已提交
557 558
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
S
sdong 已提交
559
    if (backup_id == 0 || file != rocksdb::ToString(backup_id)) {
I
Igor Canadi 已提交
560 561
      if (!read_only_) {
        // invalid file name, delete that
562 563 564
        auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
        Log(options_.info_log, "Unrecognized meta file %s, deleting -- %s",
            file.c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
565
      }
I
Igor Canadi 已提交
566 567 568
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
569 570 571 572
    backups_.insert(std::move(
        std::make_pair(backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                                      GetBackupMetaFile(backup_id),
                                      &backuped_file_infos_, backup_env_)))));
I
Igor Canadi 已提交
573 574
  }

C
clark.kang 已提交
575
  if (options_.destroy_old_data) {  // Destroy old data
I
Igor Canadi 已提交
576
    assert(!read_only_);
I
Igor Canadi 已提交
577 578 579
    Log(options_.info_log,
        "Backup Engine started with destroy_old_data == true, deleting all "
        "backups");
580 581 582 583 584 585 586
    auto s = PurgeOldBackups(0);
    if (s.ok()) {
      s = GarbageCollect();
    }
    if (!s.ok()) {
      return s;
    }
I
Igor Canadi 已提交
587 588
    // start from beginning
    latest_backup_id_ = 0;
589
  } else {  // Load data from storage
I
Igor Canadi 已提交
590 591
    // load the backups if any
    for (auto& backup : backups_) {
592
      Status s = backup.second->LoadFromFile(options_.backup_dir);
I
Igor Canadi 已提交
593
      if (!s.ok()) {
I
Igor Canadi 已提交
594 595
        Log(options_.info_log, "Backup %u corrupted -- %s", backup.first,
            s.ToString().c_str());
H
Hasnain Lakhani 已提交
596
        corrupt_backups_.insert(std::make_pair(
597
              backup.first, std::make_pair(s, std::move(backup.second))));
I
Igor Canadi 已提交
598 599 600
      } else {
        Log(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
            backup.first, backup.second->GetInfoString().c_str());
I
Igor Canadi 已提交
601 602
      }
    }
H
Hasnain Lakhani 已提交
603

604
    for (const auto& corrupt : corrupt_backups_) {
H
Hasnain Lakhani 已提交
605
      backups_.erase(backups_.find(corrupt.first));
I
Igor Canadi 已提交
606 607 608
    }

    Status s = GetLatestBackupFileContents(&latest_backup_id_);
I
Igor Canadi 已提交
609

I
Igor Canadi 已提交
610 611 612 613 614 615 616 617 618 619
    // If latest backup file is corrupted or non-existent
    // set latest backup as the biggest backup we have
    // or 0 if we have no backups
    if (!s.ok() ||
        backups_.find(latest_backup_id_) == backups_.end()) {
      auto itr = backups_.end();
      latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
    }
  }

I
Igor Canadi 已提交
620 621
  Log(options_.info_log, "Latest backup is %u", latest_backup_id_);

I
Igor Canadi 已提交
622
  // delete any backups that claim to be later than latest
H
Hasnain Lakhani 已提交
623 624 625
  std::vector<BackupID> later_ids;
  for (auto itr = backups_.lower_bound(latest_backup_id_ + 1);
       itr != backups_.end(); itr++) {
I
Igor Canadi 已提交
626 627
    Log(options_.info_log,
        "Found backup claiming to be later than latest: %" PRIu32, itr->first);
H
Hasnain Lakhani 已提交
628 629 630
    later_ids.push_back(itr->first);
  }
  for (auto id : later_ids) {
631
    Status s;
I
Igor Canadi 已提交
632
    if (!read_only_) {
633
      s = DeleteBackup(id);
I
Igor Canadi 已提交
634 635 636 637
    } else {
      auto backup = backups_.find(id);
      // We just found it couple of lines earlier!
      assert(backup != backups_.end());
638
      s = backup->second->Delete(false);
I
Igor Canadi 已提交
639 640
      backups_.erase(backup);
    }
641 642 643 644
    if (!s.ok()) {
      Log(options_.info_log, "Failed deleting backup %" PRIu32 " -- %s", id,
          s.ToString().c_str());
    }
I
Igor Canadi 已提交
645 646
  }

I
Igor Canadi 已提交
647
  if (!read_only_) {
648 649 650 651
    auto s = PutLatestBackupFileContents(latest_backup_id_);
    if (!s.ok()) {
      return s;
    }
I
Igor Canadi 已提交
652
  }
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673

  // set up threads perform copies from files_to_copy_ in the background
  for (int t = 0; t < options_.max_background_operations; t++) {
    threads_.emplace_back([&]() {
      CopyWorkItem work_item;
      while (files_to_copy_.read(work_item)) {
        CopyResult result;
        result.status = CopyFile(work_item.src_path,
                                 work_item.dst_path,
                                 work_item.src_env,
                                 work_item.dst_env,
                                 work_item.sync,
                                 work_item.rate_limiter,
                                 &result.size,
                                 &result.checksum_value,
                                 work_item.size_limit);
        work_item.result.set_value(std::move(result));
      }
    });
  }

I
Igor Canadi 已提交
674
  Log(options_.info_log, "Initialized BackupEngine");
I
Igor Canadi 已提交
675

676
  return Status::OK();
677
}
I
Igor Canadi 已提交
678

I
Igor Canadi 已提交
679
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
680
  assert(initialized_);
I
Igor Canadi 已提交
681
  assert(!read_only_);
I
Igor Canadi 已提交
682 683 684 685
  Status s;
  std::vector<std::string> live_files;
  VectorLogPtr live_wal_files;
  uint64_t manifest_file_size = 0;
686
  uint64_t sequence_number = db->GetLatestSequenceNumber();
I
Igor Canadi 已提交
687 688 689 690 691 692 693

  s = db->DisableFileDeletions();
  if (s.ok()) {
    // this will return live_files prefixed with "/"
    s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
  }
  // if we didn't flush before backup, we need to also get WAL files
694
  if (s.ok() && !flush_before_backup && options_.backup_log_files) {
I
Igor Canadi 已提交
695 696 697 698
    // returns file names prefixed with "/"
    s = db->GetSortedWalFiles(live_wal_files);
  }
  if (!s.ok()) {
699
    db->EnableFileDeletions(false);
I
Igor Canadi 已提交
700 701 702 703 704
    return s;
  }

  BackupID new_backup_id = latest_backup_id_ + 1;
  assert(backups_.find(new_backup_id) == backups_.end());
705 706 707 708
  auto ret = backups_.insert(std::move(
      std::make_pair(new_backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                                        GetBackupMetaFile(new_backup_id),
                                        &backuped_file_infos_, backup_env_)))));
I
Igor Canadi 已提交
709 710
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
711 712
  new_backup->RecordTimestamp();
  new_backup->SetSequenceNumber(sequence_number);
I
Igor Canadi 已提交
713

714 715
  auto start_backup = backup_env_-> NowMicros();

I
Igor Canadi 已提交
716 717 718
  Log(options_.info_log, "Started the backup process -- creating backup %u",
      new_backup_id);

I
Igor Canadi 已提交
719 720 721
  // create temporary private dir
  s = backup_env_->CreateDir(
      GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
I
Igor Canadi 已提交
722

L
Lei Jin 已提交
723
  unique_ptr<BackupRateLimiter> rate_limiter;
I
Igor Canadi 已提交
724 725
  if (options_.backup_rate_limit > 0) {
    copy_file_buffer_size_ = options_.backup_rate_limit / 10;
L
Lei Jin 已提交
726 727
    rate_limiter.reset(new BackupRateLimiter(db_env_,
          options_.backup_rate_limit, copy_file_buffer_size_));
I
Igor Canadi 已提交
728 729
  }

730 731 732 733 734 735 736 737 738
  // A set into which we will insert the dst_paths that are calculated for live
  // files and live WAL files.
  // This is used to check whether a live files shares a dst_path with another
  // live file.
  std::unordered_set<std::string> live_dst_paths;
  live_dst_paths.reserve(live_files.size() + live_wal_files.size());

  std::vector<BackupAfterCopyWorkItem> backup_items_to_finish;
  // Add a CopyWorkItem to the channel for each live file
I
Igor Canadi 已提交
739 740 741 742
  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(live_files[i], &number, &type);
I
Igor Canadi 已提交
743 744 745 746
    if (!ok) {
      assert(false);
      return Status::Corruption("Can't parse file name. This is very bad");
    }
I
Igor Canadi 已提交
747
    // we should only get sst, manifest and current files here
I
Igor Canadi 已提交
748 749
    assert(type == kTableFile || type == kDescriptorFile ||
           type == kCurrentFile);
I
Igor Canadi 已提交
750 751

    // rules:
752
    // * if it's kTableFile, then it's shared
I
Igor Canadi 已提交
753
    // * if it's kDescriptorFile, limit the size to manifest_file_size
754 755 756 757 758 759 760 761 762 763 764 765
    s = AddBackupFileWorkItem(
            live_dst_paths,
            backup_items_to_finish,
            new_backup_id,
            options_.share_table_files && type == kTableFile,
            db->GetName(),
            live_files[i],
            rate_limiter.get(),
            (type == kDescriptorFile) ? manifest_file_size : 0,
            options_.share_files_with_checksum && type == kTableFile);
  }
  // Add a CopyWorkItem to the channel for each WAL file
I
Igor Canadi 已提交
766 767 768 769
  for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
    if (live_wal_files[i]->Type() == kAliveLogFile) {
      // we only care about live log files
      // copy the file into backup_dir/files/<new backup>/
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
      s = AddBackupFileWorkItem(live_dst_paths,
                                backup_items_to_finish,
                                new_backup_id,
                                false, /* not shared */
                                db->GetOptions().wal_dir,
                                live_wal_files[i]->PathName(),
                                rate_limiter.get());
    }
  }

  Status item_status;
  for (auto& item : backup_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    if (item_status.ok() && item.shared && item.needed_to_copy) {
      item_status = item.backup_env->RenameFile(item.dst_path_tmp,
                                                item.dst_path);
    }
    if (item_status.ok()) {
      item_status = new_backup.get()->AddFile(
              std::make_shared<FileInfo>(item.dst_relative,
                                         result.size,
                                         result.checksum_value));
    }
    if (!item_status.ok()) {
      s = item_status;
I
Igor Canadi 已提交
797 798 799 800
    }
  }

  // we copied all the files, enable file deletions
801
  db->EnableFileDeletions(false);
I
Igor Canadi 已提交
802

I
Igor Canadi 已提交
803 804
  if (s.ok()) {
    // move tmp private backup to real backup folder
I
Igor Canadi 已提交
805 806 807 808
    Log(options_.info_log,
        "Moving tmp backup directory to the real one: %s -> %s\n",
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)).c_str(),
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)).c_str());
I
Igor Canadi 已提交
809
    s = backup_env_->RenameFile(
810
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)),  // tmp
I
Igor Canadi 已提交
811 812 813
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)));
  }

814 815
  auto backup_time = backup_env_->NowMicros() - start_backup;

I
Igor Canadi 已提交
816 817
  if (s.ok()) {
    // persist the backup metadata on the disk
818
    s = new_backup->StoreToFile(options_.sync);
I
Igor Canadi 已提交
819 820 821 822 823
  }
  if (s.ok()) {
    // install the newly created backup meta! (atomic)
    s = PutLatestBackupFileContents(new_backup_id);
  }
I
Igor Canadi 已提交
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
  if (s.ok() && options_.sync) {
    unique_ptr<Directory> backup_private_directory;
    backup_env_->NewDirectory(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
        &backup_private_directory);
    if (backup_private_directory != nullptr) {
      backup_private_directory->Fsync();
    }
    if (private_directory_ != nullptr) {
      private_directory_->Fsync();
    }
    if (meta_directory_ != nullptr) {
      meta_directory_->Fsync();
    }
    if (shared_directory_ != nullptr) {
      shared_directory_->Fsync();
    }
    if (backup_directory_ != nullptr) {
      backup_directory_->Fsync();
    }
  }

846 847 848
  if (s.ok()) {
    backup_statistics_.IncrementNumberSuccessBackup();
  }
I
Igor Canadi 已提交
849
  if (!s.ok()) {
850
    backup_statistics_.IncrementNumberFailBackup();
I
Igor Canadi 已提交
851 852
    // clean all the files we might have created
    Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str());
853 854
    Log(options_.info_log, "Backup Statistics %s\n",
        backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
855 856 857
    // delete files that we might have already written
    DeleteBackup(new_backup_id);
    GarbageCollect();
I
Igor Canadi 已提交
858 859 860 861 862 863 864
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
  Log(options_.info_log, "Backup DONE. All is good");
865 866

  // backup_speed is in byte/second
867
  double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
868
  Log(options_.info_log, "Backup number of files: %u",
869
      new_backup->GetNumberFiles());
I
Igor Canadi 已提交
870 871 872
  char human_size[16];
  AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
  Log(options_.info_log, "Backup size: %s", human_size);
I
Igor Canadi 已提交
873
  Log(options_.info_log, "Backup time: %" PRIu64 " microseconds", backup_time);
874 875 876
  Log(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
  Log(options_.info_log, "Backup Statistics %s",
      backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
877 878 879
  return s;
}

I
Igor Canadi 已提交
880
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
881
  assert(initialized_);
I
Igor Canadi 已提交
882
  assert(!read_only_);
I
Igor Canadi 已提交
883 884
  Log(options_.info_log, "Purging old backups, keeping %u",
      num_backups_to_keep);
H
Hasnain Lakhani 已提交
885 886 887 888 889 890 891
  std::vector<BackupID> to_delete;
  auto itr = backups_.begin();
  while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
    to_delete.push_back(itr->first);
    itr++;
  }
  for (auto backup_id : to_delete) {
892 893 894 895
    auto s = DeleteBackup(backup_id);
    if (!s.ok()) {
      return s;
    }
I
Igor Canadi 已提交
896 897 898 899
  }
  return Status::OK();
}

I
Igor Canadi 已提交
900
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
901
  assert(initialized_);
I
Igor Canadi 已提交
902
  assert(!read_only_);
I
Igor Canadi 已提交
903 904
  Log(options_.info_log, "Deleting backup %u", backup_id);
  auto backup = backups_.find(backup_id);
H
Hasnain Lakhani 已提交
905
  if (backup != backups_.end()) {
906 907 908 909
    auto s = backup->second->Delete();
    if (!s.ok()) {
      return s;
    }
H
Hasnain Lakhani 已提交
910 911 912 913 914 915
    backups_.erase(backup);
  } else {
    auto corrupt = corrupt_backups_.find(backup_id);
    if (corrupt == corrupt_backups_.end()) {
      return Status::NotFound("Backup not found");
    }
916 917 918 919
    auto s = corrupt->second.second->Delete();
    if (!s.ok()) {
      return s;
    }
H
Hasnain Lakhani 已提交
920 921 922 923 924
    corrupt_backups_.erase(corrupt);
  }

  std::vector<std::string> to_delete;
  for (auto& itr : backuped_file_infos_) {
925
    if (itr.second->refs == 0) {
H
Hasnain Lakhani 已提交
926 927 928 929 930 931 932 933
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
      Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
          s.ToString().c_str());
      to_delete.push_back(itr.first);
    }
  }
  for (auto& td : to_delete) {
    backuped_file_infos_.erase(td);
I
Igor Canadi 已提交
934
  }
H
Hasnain Lakhani 已提交
935 936 937 938 939 940 941

  // take care of private dirs -- GarbageCollect() will take care of them
  // if they are not empty
  std::string private_dir = GetPrivateFileRel(backup_id);
  Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
  Log(options_.info_log, "Deleting private dir %s -- %s",
      private_dir.c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
942 943 944
  return Status::OK();
}

I
Igor Canadi 已提交
945
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
946
  assert(initialized_);
I
Igor Canadi 已提交
947 948
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
949
    if (!backup.second->Empty()) {
950
        backup_info->push_back(BackupInfo(
951 952 953
            backup.first, backup.second->GetTimestamp(),
            backup.second->GetSize(),
            backup.second->GetNumberFiles()));
I
Igor Canadi 已提交
954 955 956 957
    }
  }
}

H
Hasnain Lakhani 已提交
958 959 960
void
BackupEngineImpl::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
961
  assert(initialized_);
H
Hasnain Lakhani 已提交
962 963 964 965 966 967
  corrupt_backup_ids->reserve(corrupt_backups_.size());
  for (auto& backup : corrupt_backups_) {
    corrupt_backup_ids->push_back(backup.first);
  }
}

968 969 970
Status BackupEngineImpl::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
971
  assert(initialized_);
H
Hasnain Lakhani 已提交
972 973 974 975
  auto corrupt_itr = corrupt_backups_.find(backup_id);
  if (corrupt_itr != corrupt_backups_.end()) {
    return corrupt_itr->second.first;
  }
I
Igor Canadi 已提交
976 977 978 979 980
  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  auto& backup = backup_itr->second;
981
  if (backup->Empty()) {
I
Igor Canadi 已提交
982 983 984 985
    return Status::NotFound("Backup not found");
  }

  Log(options_.info_log, "Restoring backup id %u\n", backup_id);
986 987
  Log(options_.info_log, "keep_log_files: %d\n",
      static_cast<int>(restore_options.keep_log_files));
I
Igor Canadi 已提交
988 989 990 991 992

  // just in case. Ignore errors
  db_env_->CreateDirIfMissing(db_dir);
  db_env_->CreateDirIfMissing(wal_dir);

993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
  if (restore_options.keep_log_files) {
    // delete files in db_dir, but keep all the log files
    DeleteChildren(db_dir, 1 << kLogFile);
    // move all the files from archive dir to wal_dir
    std::string archive_dir = ArchivalDirectory(wal_dir);
    std::vector<std::string> archive_files;
    db_env_->GetChildren(archive_dir, &archive_files);  // ignore errors
    for (const auto& f : archive_files) {
      uint64_t number;
      FileType type;
      bool ok = ParseFileName(f, &number, &type);
      if (ok && type == kLogFile) {
        Log(options_.info_log, "Moving log file from archive/ to wal_dir: %s",
            f.c_str());
        Status s =
            db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
        if (!s.ok()) {
          // if we can't move log file from archive_dir to wal_dir,
          // we should fail, since it might mean data loss
          return s;
        }
      }
    }
  } else {
    DeleteChildren(wal_dir);
    DeleteChildren(ArchivalDirectory(wal_dir));
    DeleteChildren(db_dir);
1020
  }
I
Igor Canadi 已提交
1021

L
Lei Jin 已提交
1022
  unique_ptr<BackupRateLimiter> rate_limiter;
I
Igor Canadi 已提交
1023 1024
  if (options_.restore_rate_limit > 0) {
    copy_file_buffer_size_ = options_.restore_rate_limit / 10;
L
Lei Jin 已提交
1025 1026
    rate_limiter.reset(new BackupRateLimiter(db_env_,
          options_.restore_rate_limit, copy_file_buffer_size_));
I
Igor Canadi 已提交
1027
  }
I
Igor Canadi 已提交
1028
  Status s;
1029
  std::vector<RestoreAfterCopyWorkItem> restore_items_to_finish;
1030 1031
  for (const auto& file_info : backup->GetFiles()) {
    const std::string &file = file_info->filename;
I
Igor Canadi 已提交
1032 1033 1034
    std::string dst;
    // 1. extract the filename
    size_t slash = file.find_last_of('/');
1035 1036
    // file will either be shared/<file>, shared_checksum/<file_crc32_size>
    // or private/<number>/<file>
I
Igor Canadi 已提交
1037 1038 1039
    assert(slash != std::string::npos);
    dst = file.substr(slash + 1);

1040 1041 1042 1043 1044 1045
    // if the file was in shared_checksum, extract the real file name
    // in this case the file is <number>_<checksum>_<size>.<type>
    if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
      dst = GetFileFromChecksumFile(dst);
    }

I
Igor Canadi 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
    // 2. find the filetype
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(dst, &number, &type);
    if (!ok) {
      return Status::Corruption("Backup corrupted");
    }
    // 3. Construct the final path
    // kLogFile lives in wal_dir and all the rest live in db_dir
    dst = ((type == kLogFile) ? wal_dir : db_dir) +
      "/" + dst;

    Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
    CopyWorkItem copy_work_item(GetAbsolutePath(file),
                                dst,
                                backup_env_,
                                db_env_,
                                false,
                                rate_limiter.get(),
                                0 /* size_limit */);
    RestoreAfterCopyWorkItem after_copy_work_item(
            copy_work_item.result.get_future(),
            file_info->checksum_value);
    files_to_copy_.write(std::move(copy_work_item));
    restore_items_to_finish.push_back(std::move(after_copy_work_item));
  }
  Status item_status;
  for (auto& item : restore_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    // Note: It is possible that both of the following bad-status cases occur
    // during copying. But, we only return one status.
    if (!item_status.ok()) {
      s = item_status;
I
Igor Canadi 已提交
1081
      break;
1082
    } else if (item.checksum_value != result.checksum_value) {
L
Lei Jin 已提交
1083 1084 1085
      s = Status::Corruption("Checksum check failed");
      break;
    }
I
Igor Canadi 已提交
1086 1087 1088 1089 1090 1091 1092
  }

  Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
  return s;
}

// latest backup id is an ASCII representation of latest backup id
I
Igor Canadi 已提交
1093
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
I
Igor Canadi 已提交
1094 1095 1096 1097 1098 1099 1100 1101 1102
  Status s;
  unique_ptr<SequentialFile> file;
  s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
                                     &file,
                                     EnvOptions());
  if (!s.ok()) {
    return s;
  }

1103 1104
  char buf[11];
  Slice data;
1105 1106 1107 1108
  unique_ptr<SequentialFileReader> file_reader(
      new SequentialFileReader(std::move(file)));

  s = file_reader->Read(10, &data, buf);
I
Igor Canadi 已提交
1109 1110 1111
  if (!s.ok() || data.size() == 0) {
    return s.ok() ? Status::Corruption("Latest backup file corrupted") : s;
  }
1112
  buf[data.size()] = 0;
I
Igor Canadi 已提交
1113

1114
  *latest_backup = 0;
I
Igor Canadi 已提交
1115
  sscanf(data.data(), "%u", latest_backup);
A
agiardullo 已提交
1116 1117 1118

  s = backup_env_->FileExists(GetBackupMetaFile(*latest_backup));
  if (s.IsNotFound()) {
I
Igor Canadi 已提交
1119 1120
    s = Status::Corruption("Latest backup file corrupted");
  }
A
agiardullo 已提交
1121
  return s;
I
Igor Canadi 已提交
1122 1123 1124 1125 1126 1127
}

// this operation HAS to be atomic
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
I
Igor Canadi 已提交
1128
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
I
Igor Canadi 已提交
1129
  assert(!read_only_);
I
Igor Canadi 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
  Status s;
  unique_ptr<WritableFile> file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = backup_env_->NewWritableFile(GetLatestBackupFile(true),
                                   &file,
                                   env_options);
  if (!s.ok()) {
    backup_env_->DeleteFile(GetLatestBackupFile(true));
    return s;
  }

1142 1143
  unique_ptr<WritableFileWriter> file_writer(
      new WritableFileWriter(std::move(file), env_options));
1144
  char file_contents[10];
1145 1146
  int len =
      snprintf(file_contents, sizeof(file_contents), "%u\n", latest_backup);
1147
  s = file_writer->Append(Slice(file_contents, len));
I
Igor Canadi 已提交
1148
  if (s.ok() && options_.sync) {
1149
    file_writer->Sync(false);
I
Igor Canadi 已提交
1150 1151
  }
  if (s.ok()) {
1152
    s = file_writer->Close();
I
Igor Canadi 已提交
1153 1154 1155 1156 1157 1158 1159 1160 1161
  }
  if (s.ok()) {
    // atomically replace real file with new tmp
    s = backup_env_->RenameFile(GetLatestBackupFile(true),
                                GetLatestBackupFile(false));
  }
  return s;
}

L
Lei Jin 已提交
1162 1163 1164 1165 1166 1167 1168
Status BackupEngineImpl::CopyFile(
    const std::string& src,
    const std::string& dst, Env* src_env,
    Env* dst_env, bool sync,
    BackupRateLimiter* rate_limiter, uint64_t* size,
    uint32_t* checksum_value,
    uint64_t size_limit) {
I
Igor Canadi 已提交
1169 1170 1171 1172 1173
  Status s;
  unique_ptr<WritableFile> dst_file;
  unique_ptr<SequentialFile> src_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
1174
  env_options.use_os_buffer = false;
I
Igor Canadi 已提交
1175 1176 1177
  if (size != nullptr) {
    *size = 0;
  }
L
Lei Jin 已提交
1178 1179 1180
  if (checksum_value != nullptr) {
    *checksum_value = 0;
  }
I
Igor Canadi 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194

  // Check if size limit is set. if not, set it to very big number
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (s.ok()) {
    s = dst_env->NewWritableFile(dst, &dst_file, env_options);
  }
  if (!s.ok()) {
    return s;
  }

1195 1196 1197 1198
  unique_ptr<WritableFileWriter> dest_writer(
      new WritableFileWriter(std::move(dst_file), env_options));
  unique_ptr<SequentialFileReader> src_reader(
      new SequentialFileReader(std::move(src_file)));
1199 1200
  unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;
I
Igor Canadi 已提交
1201 1202

  do {
I
Igor Canadi 已提交
1203 1204 1205
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
I
Igor Canadi 已提交
1206 1207
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
1208
    s = src_reader->Read(buffer_to_read, &data, buf.get());
I
Igor Canadi 已提交
1209
    size_limit -= data.size();
L
Lei Jin 已提交
1210 1211 1212 1213 1214

    if (!s.ok()) {
      return s;
    }

I
Igor Canadi 已提交
1215 1216 1217
    if (size != nullptr) {
      *size += data.size();
    }
L
Lei Jin 已提交
1218 1219 1220
    if (checksum_value != nullptr) {
      *checksum_value = crc32c::Extend(*checksum_value, data.data(),
                                       data.size());
I
Igor Canadi 已提交
1221
    }
1222
    s = dest_writer->Append(data);
I
Igor Canadi 已提交
1223 1224 1225
    if (rate_limiter != nullptr) {
      rate_limiter->ReportAndWait(data.size());
    }
I
Igor Canadi 已提交
1226 1227 1228
  } while (s.ok() && data.size() > 0 && size_limit > 0);

  if (s.ok() && sync) {
1229
    s = dest_writer->Sync(false);
I
Igor Canadi 已提交
1230 1231 1232 1233 1234 1235
  }

  return s;
}

// src_fname will always start with "/"
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
Status BackupEngineImpl::AddBackupFileWorkItem(
        std::unordered_set<std::string>& live_dst_paths,
        std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
        BackupID backup_id,
        bool shared,
        const std::string& src_dir,
        const std::string& src_fname,
        BackupRateLimiter* rate_limiter,
        uint64_t size_limit,
        bool shared_checksum) {
I
Igor Canadi 已提交
1246 1247
  assert(src_fname.size() > 0 && src_fname[0] == '/');
  std::string dst_relative = src_fname.substr(1);
I
Igor Canadi 已提交
1248
  std::string dst_relative_tmp;
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
  Status s;
  uint64_t size;
  uint32_t checksum_value = 0;

  if (shared && shared_checksum) {
    // add checksum and file length to the file name
    s = CalculateChecksum(src_dir + src_fname,
                          db_env_,
                          size_limit,
                          &checksum_value);
    if (s.ok()) {
        s = db_env_->GetFileSize(src_dir + src_fname, &size);
    }
    if (!s.ok()) {
         return s;
    }
    dst_relative = GetSharedFileWithChecksum(dst_relative, checksum_value,
                                             size);
    dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
    dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
  } else if (shared) {
I
Igor Canadi 已提交
1270 1271
    dst_relative_tmp = GetSharedFileRel(dst_relative, true);
    dst_relative = GetSharedFileRel(dst_relative, false);
I
Igor Canadi 已提交
1272
  } else {
I
Igor Canadi 已提交
1273 1274
    dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative);
    dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
I
Igor Canadi 已提交
1275 1276
  }
  std::string dst_path = GetAbsolutePath(dst_relative);
I
Igor Canadi 已提交
1277
  std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp);
I
Igor Canadi 已提交
1278

1279 1280
  // if it's shared, we also need to check if it exists -- if it does, no need
  // to copy it again.
I
Igor Canadi 已提交
1281
  bool need_to_copy = true;
1282 1283 1284
  // true if dst_path is the same path as another live file
  const bool same_path =
      live_dst_paths.find(dst_path) != live_dst_paths.end();
A
agiardullo 已提交
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299

  bool file_exists = false;
  if (shared && !same_path) {
    Status exist = backup_env_->FileExists(dst_path);
    if (exist.ok()) {
      file_exists = true;
    } else if (exist.IsNotFound()) {
      file_exists = false;
    } else {
      assert(s.IsIOError());
      return exist;
    }
  }

  if (shared && (same_path || file_exists)) {
I
Igor Canadi 已提交
1300
    need_to_copy = false;
1301 1302 1303 1304
    if (shared_checksum) {
      Log(options_.info_log,
          "%s already present, with checksum %u and size %" PRIu64,
          src_fname.c_str(), checksum_value, size);
I
Igor Canadi 已提交
1305
    } else if (backuped_file_infos_.find(dst_relative) ==
1306
               backuped_file_infos_.end() && !same_path) {
I
Igor Canadi 已提交
1307 1308 1309 1310 1311 1312 1313 1314
      // file already exists, but it's not referenced by any backup. overwrite
      // the file
      Log(options_.info_log,
          "%s already present, but not referenced by any backup. We will "
          "overwrite the file.",
          src_fname.c_str());
      need_to_copy = true;
      backup_env_->DeleteFile(dst_path);
1315
    } else {
I
Igor Canadi 已提交
1316 1317
      // the file is present and referenced by a backup
      db_env_->GetFileSize(src_dir + src_fname, &size);  // Ignore error
1318 1319
      Log(options_.info_log, "%s already present, calculate checksum",
          src_fname.c_str());
I
Igor Canadi 已提交
1320
      s = CalculateChecksum(src_dir + src_fname, db_env_, size_limit,
1321 1322
                            &checksum_value);
    }
I
Igor Canadi 已提交
1323
  }
1324 1325
  live_dst_paths.insert(dst_path);

I
Igor Canadi 已提交
1326
  if (need_to_copy) {
I
Igor Canadi 已提交
1327
    Log(options_.info_log, "Copying %s to %s", src_fname.c_str(),
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
            dst_path_tmp.c_str());
    CopyWorkItem copy_work_item(src_dir + src_fname,
                                dst_path_tmp,
                                db_env_,
                                backup_env_,
                                options_.sync,
                                rate_limiter,
                                size_limit);
    BackupAfterCopyWorkItem after_copy_work_item(
            copy_work_item.result.get_future(),
            shared,
            need_to_copy,
            backup_env_,
            dst_path_tmp,
            dst_path,
            dst_relative);
    files_to_copy_.write(std::move(copy_work_item));
    backup_items_to_finish.push_back(std::move(after_copy_work_item));
  } else {
    std::promise<CopyResult> promise_result;
    BackupAfterCopyWorkItem after_copy_work_item(
            promise_result.get_future(),
            shared,
            need_to_copy,
            backup_env_,
            dst_path_tmp,
            dst_path,
            dst_relative);
    backup_items_to_finish.push_back(std::move(after_copy_work_item));
    CopyResult result;
    result.status = s;
    result.size = size;
    result.checksum_value = checksum_value;
    promise_result.set_value(std::move(result));
L
Lei Jin 已提交
1362 1363 1364 1365
  }
  return s;
}

I
Igor Canadi 已提交
1366 1367 1368
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
                                           uint64_t size_limit,
                                           uint32_t* checksum_value) {
L
Lei Jin 已提交
1369 1370 1371 1372 1373 1374 1375
  *checksum_value = 0;
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
1376
  env_options.use_os_buffer = false;
L
Lei Jin 已提交
1377 1378 1379 1380 1381

  std::unique_ptr<SequentialFile> src_file;
  Status s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (!s.ok()) {
    return s;
I
Igor Canadi 已提交
1382
  }
L
Lei Jin 已提交
1383

1384 1385
  unique_ptr<SequentialFileReader> src_reader(
      new SequentialFileReader(std::move(src_file)));
L
Lei Jin 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394
  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;

  do {
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
1395
    s = src_reader->Read(buffer_to_read, &data, buf.get());
L
Lei Jin 已提交
1396 1397 1398 1399 1400 1401 1402 1403 1404

    if (!s.ok()) {
      return s;
    }

    size_limit -= data.size();
    *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
  } while (data.size() > 0 && size_limit > 0);

I
Igor Canadi 已提交
1405 1406 1407
  return s;
}

1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
void BackupEngineImpl::DeleteChildren(const std::string& dir,
                                      uint32_t file_type_filter) {
  std::vector<std::string> children;
  db_env_->GetChildren(dir, &children);  // ignore errors

  for (const auto& f : children) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(f, &number, &type);
    if (ok && (file_type_filter & (1 << type))) {
      // don't delete this file
      continue;
    }
    db_env_->DeleteFile(dir + "/" + f);  // ignore errors
  }
}

H
Hasnain Lakhani 已提交
1425
Status BackupEngineImpl::GarbageCollect() {
I
Igor Canadi 已提交
1426
  assert(!read_only_);
I
Igor Canadi 已提交
1427
  Log(options_.info_log, "Starting garbage collection");
H
Hasnain Lakhani 已提交
1428 1429 1430

  // delete obsolete shared files
  std::vector<std::string> shared_children;
1431 1432 1433 1434 1435 1436 1437
  {
    auto s = backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
                                      &shared_children);
    if (!s.ok()) {
      return s;
    }
  }
H
Hasnain Lakhani 已提交
1438 1439
  for (auto& child : shared_children) {
    std::string rel_fname = GetSharedFileRel(child);
I
Igor Canadi 已提交
1440
    auto child_itr = backuped_file_infos_.find(rel_fname);
H
Hasnain Lakhani 已提交
1441
    // if it's not refcounted, delete it
I
Igor Canadi 已提交
1442 1443
    if (child_itr == backuped_file_infos_.end() ||
        child_itr->second->refs == 0) {
H
Hasnain Lakhani 已提交
1444 1445 1446
      // this might be a directory, but DeleteFile will just fail in that
      // case, so we're good
      Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1447 1448
      Log(options_.info_log, "Deleting %s -- %s", rel_fname.c_str(),
          s.ToString().c_str());
I
Igor Canadi 已提交
1449
      backuped_file_infos_.erase(rel_fname);
I
Igor Canadi 已提交
1450
    }
H
Hasnain Lakhani 已提交
1451
  }
I
Igor Canadi 已提交
1452

H
Hasnain Lakhani 已提交
1453 1454
  // delete obsolete private files
  std::vector<std::string> private_children;
1455 1456 1457 1458 1459 1460 1461
  {
    auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
                                      &private_children);
    if (!s.ok()) {
      return s;
    }
  }
H
Hasnain Lakhani 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
  for (auto& child : private_children) {
    BackupID backup_id = 0;
    bool tmp_dir = child.find(".tmp") != std::string::npos;
    sscanf(child.c_str(), "%u", &backup_id);
    if (!tmp_dir &&  // if it's tmp_dir, delete it
        (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
      // it's either not a number or it's still alive. continue
      continue;
    }
    // here we have to delete the dir and all its children
    std::string full_private_path =
        GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir));
    std::vector<std::string> subchildren;
    backup_env_->GetChildren(full_private_path, &subchildren);
    for (auto& subchild : subchildren) {
      Status s = backup_env_->DeleteFile(full_private_path + subchild);
1478 1479
      Log(options_.info_log, "Deleting %s -- %s",
          (full_private_path + subchild).c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
1480
    }
H
Hasnain Lakhani 已提交
1481 1482
    // finally delete the private dir
    Status s = backup_env_->DeleteDir(full_private_path);
1483
    Log(options_.info_log, "Deleting dir %s -- %s", full_private_path.c_str(),
H
Hasnain Lakhani 已提交
1484
        s.ToString().c_str());
I
Igor Canadi 已提交
1485
  }
H
Hasnain Lakhani 已提交
1486 1487

  return Status::OK();
I
Igor Canadi 已提交
1488 1489 1490 1491
}

// ------- BackupMeta class --------

1492 1493 1494
Status BackupEngineImpl::BackupMeta::AddFile(
    std::shared_ptr<FileInfo> file_info) {
  auto itr = file_infos_->find(file_info->filename);
L
Lei Jin 已提交
1495
  if (itr == file_infos_->end()) {
1496
    auto ret = file_infos_->insert({file_info->filename, file_info});
L
Lei Jin 已提交
1497
    if (ret.second) {
1498 1499
      itr = ret.first;
      itr->second->refs = 1;
L
Lei Jin 已提交
1500 1501 1502 1503
    } else {
      // if this happens, something is seriously wrong
      return Status::Corruption("In memory metadata insertion error");
    }
I
Igor Canadi 已提交
1504
  } else {
1505
    if (itr->second->checksum_value != file_info->checksum_value) {
I
Igor Canadi 已提交
1506 1507 1508
      return Status::Corruption(
          "Checksum mismatch for existing backup file. Delete old backups and "
          "try again.");
L
Lei Jin 已提交
1509
    }
1510
    ++itr->second->refs;  // increase refcount if already present
I
Igor Canadi 已提交
1511
  }
L
Lei Jin 已提交
1512

1513 1514 1515
  size_ += file_info->size;
  files_.push_back(itr->second);

L
Lei Jin 已提交
1516
  return Status::OK();
I
Igor Canadi 已提交
1517 1518
}

1519 1520
Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
  Status s;
L
Lei Jin 已提交
1521
  for (const auto& file : files_) {
1522
    --file->refs;  // decrease refcount
I
Igor Canadi 已提交
1523 1524 1525
  }
  files_.clear();
  // delete meta file
A
agiardullo 已提交
1526 1527 1528 1529 1530 1531 1532
  if (delete_meta) {
    s = env_->FileExists(meta_filename_);
    if (s.ok()) {
      s = env_->DeleteFile(meta_filename_);
    } else if (s.IsNotFound()) {
      s = Status::OK();  // nothing to delete
    }
I
Igor Canadi 已提交
1533
  }
I
Igor Canadi 已提交
1534
  timestamp_ = 0;
1535
  return s;
I
Igor Canadi 已提交
1536 1537 1538 1539
}

// each backup meta file is of the format:
// <timestamp>
1540
// <seq number>
I
Igor Canadi 已提交
1541
// <number of files>
L
Lei Jin 已提交
1542 1543
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
I
Igor Canadi 已提交
1544
// ...
I
Igor Canadi 已提交
1545 1546
Status BackupEngineImpl::BackupMeta::LoadFromFile(
    const std::string& backup_dir) {
I
Igor Canadi 已提交
1547 1548 1549 1550 1551 1552 1553 1554
  assert(Empty());
  Status s;
  unique_ptr<SequentialFile> backup_meta_file;
  s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  if (!s.ok()) {
    return s;
  }

1555 1556
  unique_ptr<SequentialFileReader> backup_meta_reader(
      new SequentialFileReader(std::move(backup_meta_file)));
1557 1558
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
  Slice data;
1559
  s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
I
Igor Canadi 已提交
1560 1561

  if (!s.ok() || data.size() == max_backup_meta_file_size_) {
L
Lei Jin 已提交
1562
    return s.ok() ? Status::Corruption("File size too big") : s;
I
Igor Canadi 已提交
1563 1564 1565 1566
  }
  buf[data.size()] = 0;

  uint32_t num_files = 0;
1567 1568 1569 1570 1571
  char *next;
  timestamp_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  sequence_number_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1572
  num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1573
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
I
Igor Canadi 已提交
1574

1575
  std::vector<std::shared_ptr<FileInfo>> files;
1576

1577 1578
  Slice checksum_prefix("crc32 ");

I
Igor Canadi 已提交
1579
  for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
L
Lei Jin 已提交
1580 1581 1582
    auto line = GetSliceUntil(&data, '\n');
    std::string filename = GetSliceUntil(&line, ' ').ToString();

I
Igor Canadi 已提交
1583
    uint64_t size;
1584 1585
    const std::shared_ptr<FileInfo> file_info = GetFile(filename);
    if (file_info) {
1586 1587 1588 1589 1590 1591
      size = file_info->size;
    } else {
      s = env_->GetFileSize(backup_dir + "/" + filename, &size);
      if (!s.ok()) {
        return s;
      }
I
Igor Canadi 已提交
1592
    }
L
Lei Jin 已提交
1593 1594

    if (line.empty()) {
I
Igor Canadi 已提交
1595 1596
      return Status::Corruption("File checksum is missing for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
1597 1598 1599
    }

    uint32_t checksum_value = 0;
1600 1601
    if (line.starts_with(checksum_prefix)) {
      line.remove_prefix(checksum_prefix.size());
1602 1603
      checksum_value = static_cast<uint32_t>(
          strtoul(line.data(), nullptr, 10));
S
sdong 已提交
1604
      if (line != rocksdb::ToString(checksum_value)) {
I
Igor Canadi 已提交
1605 1606
        return Status::Corruption("Invalid checksum value for " + filename +
                                  " in " + meta_filename_);
L
Lei Jin 已提交
1607 1608
      }
    } else {
I
Igor Canadi 已提交
1609 1610
      return Status::Corruption("Unknown checksum type for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
1611 1612
    }

1613
    files.emplace_back(new FileInfo(filename, size, checksum_value));
1614 1615
  }

I
Igor Canadi 已提交
1616 1617
  if (s.ok() && data.size() > 0) {
    // file has to be read completely. if not, we count it as corruption
I
Igor Canadi 已提交
1618 1619
    s = Status::Corruption("Tailing data in backup meta file in " +
                           meta_filename_);
I
Igor Canadi 已提交
1620 1621
  }

1622
  if (s.ok()) {
1623
    files_.reserve(files.size());
L
Lei Jin 已提交
1624 1625 1626 1627 1628
    for (const auto& file_info : files) {
      s = AddFile(file_info);
      if (!s.ok()) {
        break;
      }
1629
    }
I
Igor Canadi 已提交
1630 1631 1632 1633 1634
  }

  return s;
}

I
Igor Canadi 已提交
1635
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
I
Igor Canadi 已提交
1636 1637 1638 1639 1640 1641 1642 1643 1644 1645
  Status s;
  unique_ptr<WritableFile> backup_meta_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file,
                            env_options);
  if (!s.ok()) {
    return s;
  }

1646
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
I
Igor Canadi 已提交
1647
  int len = 0, buf_size = max_backup_meta_file_size_;
1648
  len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1649 1650
  len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
                  sequence_number_);
S
sdong 已提交
1651 1652
  len += snprintf(buf.get() + len, buf_size - len, "%" ROCKSDB_PRIszt "\n",
                  files_.size());
L
Lei Jin 已提交
1653 1654 1655
  for (const auto& file : files_) {
    // use crc32 for now, switch to something else if needed
    len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
1656
                    file->filename.c_str(), file->checksum_value);
I
Igor Canadi 已提交
1657 1658
  }

1659
  s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
I
Igor Canadi 已提交
1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671
  if (s.ok() && sync) {
    s = backup_meta_file->Sync();
  }
  if (s.ok()) {
    s = backup_meta_file->Close();
  }
  if (s.ok()) {
    s = env_->RenameFile(meta_filename_ + ".tmp", meta_filename_);
  }
  return s;
}

I
Igor Canadi 已提交
1672 1673 1674
// -------- BackupEngineReadOnlyImpl ---------
class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
 public:
I
Igor Canadi 已提交
1675 1676 1677
  BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
      : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}

I
Igor Canadi 已提交
1678 1679
  virtual ~BackupEngineReadOnlyImpl() {}

I
Igor Sugak 已提交
1680
  virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
I
Igor Canadi 已提交
1681 1682 1683
    backup_engine_->GetBackupInfo(backup_info);
  }

I
Igor Sugak 已提交
1684 1685
  virtual void GetCorruptedBackups(
      std::vector<BackupID>* corrupt_backup_ids) override {
H
Hasnain Lakhani 已提交
1686 1687 1688
    backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
  }

I
Igor Canadi 已提交
1689 1690
  virtual Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
I
Igor Sugak 已提交
1691
      const RestoreOptions& restore_options = RestoreOptions()) override {
I
Igor Canadi 已提交
1692 1693 1694 1695 1696 1697
    return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                               restore_options);
  }

  virtual Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
I
Igor Sugak 已提交
1698
      const RestoreOptions& restore_options = RestoreOptions()) override {
I
Igor Canadi 已提交
1699 1700 1701 1702
    return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                     restore_options);
  }

1703 1704
  Status Initialize() { return backup_engine_->Initialize(); }

I
Igor Canadi 已提交
1705
 private:
I
Igor Canadi 已提交
1706
  std::unique_ptr<BackupEngineImpl> backup_engine_;
I
Igor Canadi 已提交
1707 1708
};

I
Igor Canadi 已提交
1709 1710 1711 1712 1713 1714
Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
                                  BackupEngineReadOnly** backup_engine_ptr) {
  if (options.destroy_old_data) {
    return Status::InvalidArgument(
        "Can't destroy old data with ReadOnly BackupEngine");
  }
1715 1716 1717 1718 1719 1720 1721 1722
  std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
      new BackupEngineReadOnlyImpl(env, options));
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
I
Igor Canadi 已提交
1723 1724 1725
  return Status::OK();
}

I
Igor Canadi 已提交
1726 1727
// --- BackupableDB methods --------

1728
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
1729 1730 1731 1732 1733
    : StackableDB(db) {
  auto backup_engine_impl = new BackupEngineImpl(db->GetEnv(), options);
  status_ = backup_engine_impl->Initialize();
  backup_engine_ = backup_engine_impl;
}
I
Igor Canadi 已提交
1734 1735 1736 1737 1738 1739

BackupableDB::~BackupableDB() {
  delete backup_engine_;
}

Status BackupableDB::CreateNewBackup(bool flush_before_backup) {
1740 1741 1742
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1743 1744 1745 1746
  return backup_engine_->CreateNewBackup(this, flush_before_backup);
}

void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1747 1748 1749
  if (!status_.ok()) {
    return;
  }
I
Igor Canadi 已提交
1750 1751 1752
  backup_engine_->GetBackupInfo(backup_info);
}

H
Hasnain Lakhani 已提交
1753 1754
void
BackupableDB::GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) {
1755 1756 1757
  if (!status_.ok()) {
    return;
  }
H
Hasnain Lakhani 已提交
1758 1759 1760
  backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}

I
Igor Canadi 已提交
1761
Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
1762 1763 1764
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1765 1766 1767 1768
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status BackupableDB::DeleteBackup(BackupID backup_id) {
1769 1770 1771
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1772 1773 1774
  return backup_engine_->DeleteBackup(backup_id);
}

I
Igor Canadi 已提交
1775 1776 1777 1778
void BackupableDB::StopBackup() {
  backup_engine_->StopBackup();
}

H
Hasnain Lakhani 已提交
1779
Status BackupableDB::GarbageCollect() {
1780 1781 1782
  if (!status_.ok()) {
    return status_;
  }
H
Hasnain Lakhani 已提交
1783 1784 1785
  return backup_engine_->GarbageCollect();
}

I
Igor Canadi 已提交
1786 1787 1788
// --- RestoreBackupableDB methods ------

RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
1789 1790 1791 1792 1793
                                         const BackupableDBOptions& options) {
  auto backup_engine_impl = new BackupEngineImpl(db_env, options);
  status_ = backup_engine_impl->Initialize();
  backup_engine_ = backup_engine_impl;
}
I
Igor Canadi 已提交
1794 1795 1796 1797 1798 1799 1800

RestoreBackupableDB::~RestoreBackupableDB() {
  delete backup_engine_;
}

void
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1801 1802 1803
  if (!status_.ok()) {
    return;
  }
I
Igor Canadi 已提交
1804 1805 1806
  backup_engine_->GetBackupInfo(backup_info);
}

H
Hasnain Lakhani 已提交
1807 1808
void RestoreBackupableDB::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
1809 1810 1811
  if (!status_.ok()) {
    return;
  }
H
Hasnain Lakhani 已提交
1812 1813 1814
  backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}

1815 1816 1817
Status RestoreBackupableDB::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
1818 1819 1820
  if (!status_.ok()) {
    return status_;
  }
1821 1822
  return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                             restore_options);
I
Igor Canadi 已提交
1823 1824
}

1825 1826 1827
Status RestoreBackupableDB::RestoreDBFromLatestBackup(
    const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
1828 1829 1830
  if (!status_.ok()) {
    return status_;
  }
1831 1832
  return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                   restore_options);
I
Igor Canadi 已提交
1833 1834 1835
}

Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
1836 1837 1838
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1839 1840 1841 1842
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) {
1843 1844 1845
  if (!status_.ok()) {
    return status_;
  }
I
Igor Canadi 已提交
1846 1847 1848
  return backup_engine_->DeleteBackup(backup_id);
}

H
Hasnain Lakhani 已提交
1849
Status RestoreBackupableDB::GarbageCollect() {
1850 1851 1852
  if (!status_.ok()) {
    return status_;
  }
H
Hasnain Lakhani 已提交
1853 1854 1855
  return backup_engine_->GarbageCollect();
}

I
Igor Canadi 已提交
1856
}  // namespace rocksdb
I
Igor Canadi 已提交
1857 1858

#endif  // ROCKSDB_LITE