backupable_db.cc 40.8 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10 11
#ifndef ROCKSDB_LITE

I
Igor Canadi 已提交
12 13 14
#include "utilities/backupable_db.h"
#include "db/filename.h"
#include "util/coding.h"
L
Lei Jin 已提交
15
#include "util/crc32c.h"
I
Igor Canadi 已提交
16 17 18 19 20 21 22 23 24 25
#include "rocksdb/transaction_log.h"

#define __STDC_FORMAT_MACROS

#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <map>
#include <string>
#include <limits>
I
Igor Canadi 已提交
26
#include <atomic>
27
#include <unordered_map>
I
Igor Canadi 已提交
28 29 30

namespace rocksdb {

I
Igor Canadi 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
namespace {
class RateLimiter {
 public:
  RateLimiter(Env* env, uint64_t max_bytes_per_second, uint64_t bytes_per_check)
      : env_(env),
        max_bytes_per_second_(max_bytes_per_second),
        bytes_per_check_(bytes_per_check),
        micros_start_time_(env->NowMicros()),
        bytes_since_start_(0) {}

  void ReportAndWait(uint64_t bytes_since_last_call) {
    bytes_since_start_ += bytes_since_last_call;
    if (bytes_since_start_ < bytes_per_check_) {
      // not enough bytes to be rate-limited
      return;
    }

    uint64_t now = env_->NowMicros();
    uint64_t interval = now - micros_start_time_;
    uint64_t should_take_micros =
        (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_;

    if (should_take_micros > interval) {
      env_->SleepForMicroseconds(should_take_micros - interval);
      now = env_->NowMicros();
    }
    // reset interval
    micros_start_time_ = now;
    bytes_since_start_ = 0;
  }

 private:
  Env* env_;
  uint64_t max_bytes_per_second_;
  uint64_t bytes_per_check_;
  uint64_t micros_start_time_;
  uint64_t bytes_since_start_;
  static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};
}  // namespace

I
Igor Canadi 已提交
72
void BackupableDBOptions::Dump(Logger* logger) const {
I
Igor Canadi 已提交
73 74 75
  Log(logger, "        Options.backup_dir: %s", backup_dir.c_str());
  Log(logger, "        Options.backup_env: %p", backup_env);
  Log(logger, " Options.share_table_files: %d",
I
Igor Canadi 已提交
76
      static_cast<int>(share_table_files));
I
Igor Canadi 已提交
77 78 79
  Log(logger, "          Options.info_log: %p", info_log);
  Log(logger, "              Options.sync: %d", static_cast<int>(sync));
  Log(logger, "  Options.destroy_old_data: %d",
I
Igor Canadi 已提交
80
      static_cast<int>(destroy_old_data));
I
Igor Canadi 已提交
81
  Log(logger, "  Options.backup_log_files: %d",
82
      static_cast<int>(backup_log_files));
I
Igor Canadi 已提交
83 84
  Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit);
  Log(logger, "Options.restore_rate_limit: %" PRIu64, restore_rate_limit);
I
Igor Canadi 已提交
85 86
}

I
Igor Canadi 已提交
87 88
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
I
Igor Canadi 已提交
89
 public:
I
Igor Canadi 已提交
90 91
  BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
                   bool read_only = false);
I
Igor Canadi 已提交
92
  ~BackupEngineImpl();
I
Igor Canadi 已提交
93 94 95
  Status CreateNewBackup(DB* db, bool flush_before_backup = false);
  Status PurgeOldBackups(uint32_t num_backups_to_keep);
  Status DeleteBackup(BackupID backup_id);
I
Igor Canadi 已提交
96 97 98
  void StopBackup() {
    stop_backup_.store(true, std::memory_order_release);
  }
I
Igor Canadi 已提交
99 100

  void GetBackupInfo(std::vector<BackupInfo>* backup_info);
101 102 103 104 105 106 107 108 109 110
  Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir,
                             const std::string& wal_dir,
                             const RestoreOptions& restore_options =
                                 RestoreOptions());
  Status RestoreDBFromLatestBackup(const std::string& db_dir,
                                   const std::string& wal_dir,
                                   const RestoreOptions& restore_options =
                                       RestoreOptions()) {
    return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir,
                               restore_options);
I
Igor Canadi 已提交
111 112 113
  }

 private:
114 115
  void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);

L
Lei Jin 已提交
116 117 118 119 120 121 122 123 124 125
  struct FileInfo {
    FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
      : refs(0), filename(fname), size(sz), checksum_value(checksum) {}

    int refs;
    const std::string filename;
    const uint64_t size;
    uint32_t checksum_value;
  };

I
Igor Canadi 已提交
126 127 128
  class BackupMeta {
   public:
    BackupMeta(const std::string& meta_filename,
L
Lei Jin 已提交
129
        std::unordered_map<std::string, FileInfo>* file_infos, Env* env)
I
Igor Canadi 已提交
130
      : timestamp_(0), size_(0), meta_filename_(meta_filename),
L
Lei Jin 已提交
131
        file_infos_(file_infos), env_(env) {}
I
Igor Canadi 已提交
132 133 134 135 136 137 138 139 140 141 142 143

    ~BackupMeta() {}

    void RecordTimestamp() {
      env_->GetCurrentTime(&timestamp_);
    }
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
144 145 146 147 148 149
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
I
Igor Canadi 已提交
150

L
Lei Jin 已提交
151 152
    Status AddFile(const FileInfo& file_info);

I
Igor Canadi 已提交
153
    void Delete(bool delete_meta = true);
I
Igor Canadi 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167

    bool Empty() {
      return files_.empty();
    }

    const std::vector<std::string>& GetFiles() {
      return files_;
    }

    Status LoadFromFile(const std::string& backup_dir);
    Status StoreToFile(bool sync);

   private:
    int64_t timestamp_;
168 169 170
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
I
Igor Canadi 已提交
171 172 173 174
    uint64_t size_;
    std::string const meta_filename_;
    // files with relative paths (without "/" prefix!!)
    std::vector<std::string> files_;
L
Lei Jin 已提交
175
    std::unordered_map<std::string, FileInfo>* file_infos_;
I
Igor Canadi 已提交
176
    Env* env_;
177 178

    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
I
Igor Canadi 已提交
179 180 181 182 183 184 185 186 187 188 189
  }; // BackupMeta

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
  inline std::string GetPrivateFileRel(BackupID backup_id,
I
Igor Canadi 已提交
190 191
                                       bool tmp = false,
                                       const std::string& file = "") const {
I
Igor Canadi 已提交
192
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
193 194
    return GetPrivateDirRel() + "/" + std::to_string(backup_id) +
           (tmp ? ".tmp" : "") + "/" + file;
I
Igor Canadi 已提交
195
  }
I
Igor Canadi 已提交
196 197
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
I
Igor Canadi 已提交
198
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
199
    return "shared/" + file + (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
  }
  inline std::string GetLatestBackupFile(bool tmp = false) const {
    return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : ""));
  }
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
  inline std::string GetBackupMetaFile(BackupID backup_id) const {
    return GetBackupMetaDir() + "/" + std::to_string(backup_id);
  }

  Status GetLatestBackupFileContents(uint32_t* latest_backup);
  Status PutLatestBackupFileContents(uint32_t latest_backup);
  // if size_limit == 0, there is no size limit, copy everything
  Status CopyFile(const std::string& src,
                  const std::string& dst,
                  Env* src_env,
                  Env* dst_env,
                  bool sync,
I
Igor Canadi 已提交
219
                  RateLimiter* rate_limiter,
I
Igor Canadi 已提交
220
                  uint64_t* size = nullptr,
L
Lei Jin 已提交
221
                  uint32_t* checksum_value = nullptr,
I
Igor Canadi 已提交
222 223 224 225 226 227 228
                  uint64_t size_limit = 0);
  // if size_limit == 0, there is no size limit, copy everything
  Status BackupFile(BackupID backup_id,
                    BackupMeta* backup,
                    bool shared,
                    const std::string& src_dir,
                    const std::string& src_fname, // starts with "/"
I
Igor Canadi 已提交
229
                    RateLimiter* rate_limiter,
I
Igor Canadi 已提交
230
                    uint64_t size_limit = 0);
L
Lei Jin 已提交
231 232 233 234 235 236

  Status CalculateChecksum(const std::string& src,
                           Env* src_env,
                           uint64_t size_limit,
                           uint32_t* checksum_value);

I
Igor Canadi 已提交
237 238
  // Will delete all the files we don't need anymore
  // If full_scan == true, it will do the full scan of files/ directory
L
Lei Jin 已提交
239
  // and delete all the files that are not referenced from backuped_file_infos__
I
Igor Canadi 已提交
240 241 242 243 244
  void GarbageCollection(bool full_scan);

  // backup state data
  BackupID latest_backup_id_;
  std::map<BackupID, BackupMeta> backups_;
L
Lei Jin 已提交
245
  std::unordered_map<std::string, FileInfo> backuped_file_infos_;
I
Igor Canadi 已提交
246
  std::vector<BackupID> obsolete_backups_;
I
Igor Canadi 已提交
247
  std::atomic<bool> stop_backup_;
I
Igor Canadi 已提交
248 249 250 251 252 253

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

I
Igor Canadi 已提交
254 255 256 257 258 259
  // directories
  unique_ptr<Directory> backup_directory_;
  unique_ptr<Directory> shared_directory_;
  unique_ptr<Directory> meta_directory_;
  unique_ptr<Directory> private_directory_;

I
Igor Canadi 已提交
260 261
  static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
  size_t copy_file_buffer_size_;
I
Igor Canadi 已提交
262
  bool read_only_;
I
Igor Canadi 已提交
263 264
};

265 266
BackupEngine* BackupEngine::NewBackupEngine(
    Env* db_env, const BackupableDBOptions& options) {
I
Igor Canadi 已提交
267 268 269 270
  return new BackupEngineImpl(db_env, options);
}

BackupEngineImpl::BackupEngineImpl(Env* db_env,
I
Igor Canadi 已提交
271 272
                                   const BackupableDBOptions& options,
                                   bool read_only)
I
Igor Canadi 已提交
273 274 275
    : stop_backup_(false),
      options_(options),
      db_env_(db_env),
I
Igor Canadi 已提交
276
      backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
I
Igor Canadi 已提交
277 278 279 280 281
      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
      read_only_(read_only) {
  if (read_only_) {
    Log(options_.info_log, "Starting read_only backup engine");
  }
I
Igor Canadi 已提交
282 283
  options_.Dump(options_.info_log);

I
Igor Canadi 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297
  if (!read_only_) {
    // create all the dirs we need
    backup_env_->CreateDirIfMissing(GetAbsolutePath());
    backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_);
    if (options_.share_table_files) {
      backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
      backup_env_->NewDirectory(GetAbsolutePath(GetSharedFileRel()),
                                &shared_directory_);
    }
    backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel()));
    backup_env_->NewDirectory(GetAbsolutePath(GetPrivateDirRel()),
                              &private_directory_);
    backup_env_->CreateDirIfMissing(GetBackupMetaDir());
    backup_env_->NewDirectory(GetBackupMetaDir(), &meta_directory_);
I
Igor Canadi 已提交
298
  }
I
Igor Canadi 已提交
299 300 301 302 303 304 305 306

  std::vector<std::string> backup_meta_files;
  backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
  // create backups_ structure
  for (auto& file : backup_meta_files) {
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
    if (backup_id == 0 || file != std::to_string(backup_id)) {
I
Igor Canadi 已提交
307 308 309 310
      if (!read_only_) {
        // invalid file name, delete that
        backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
      }
I
Igor Canadi 已提交
311 312 313 314 315
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
    backups_.insert(std::make_pair(
        backup_id, BackupMeta(GetBackupMetaFile(backup_id),
L
Lei Jin 已提交
316
                              &backuped_file_infos_, backup_env_)));
I
Igor Canadi 已提交
317 318 319
  }

  if (options_.destroy_old_data) { // Destory old data
I
Igor Canadi 已提交
320
    assert(!read_only_);
I
Igor Canadi 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333
    for (auto& backup : backups_) {
      backup.second.Delete();
      obsolete_backups_.push_back(backup.first);
    }
    backups_.clear();
    // start from beginning
    latest_backup_id_ = 0;
    // GarbageCollection() will do the actual deletion
  } else { // Load data from storage
    // load the backups if any
    for (auto& backup : backups_) {
      Status s = backup.second.LoadFromFile(options_.backup_dir);
      if (!s.ok()) {
I
Igor Canadi 已提交
334 335 336 337 338 339
        Log(options_.info_log, "Backup %u corrupted -- %s", backup.first,
            s.ToString().c_str());
        if (!read_only_) {
          Log(options_.info_log, "-> Deleting backup %u", backup.first);
        }
        backup.second.Delete(!read_only_);
I
Igor Canadi 已提交
340 341 342 343 344 345 346 347 348
        obsolete_backups_.push_back(backup.first);
      }
    }
    // delete obsolete backups from the structure
    for (auto ob : obsolete_backups_) {
      backups_.erase(ob);
    }

    Status s = GetLatestBackupFileContents(&latest_backup_id_);
I
Igor Canadi 已提交
349

I
Igor Canadi 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
    // If latest backup file is corrupted or non-existent
    // set latest backup as the biggest backup we have
    // or 0 if we have no backups
    if (!s.ok() ||
        backups_.find(latest_backup_id_) == backups_.end()) {
      auto itr = backups_.end();
      latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
    }
  }

  // delete any backups that claim to be later than latest
  for (auto itr = backups_.upper_bound(latest_backup_id_);
       itr != backups_.end();) {
    itr->second.Delete();
    obsolete_backups_.push_back(itr->first);
    itr = backups_.erase(itr);
  }

I
Igor Canadi 已提交
368 369 370 371 372
  if (!read_only_) {
    PutLatestBackupFileContents(latest_backup_id_);  // Ignore errors
    GarbageCollection(true);
  }
  Log(options_.info_log, "Initialized BackupEngine, the latest backup is %u.",
I
Igor Canadi 已提交
373 374 375
      latest_backup_id_);
}

I
Igor Canadi 已提交
376
BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); }
I
Igor Canadi 已提交
377

I
Igor Canadi 已提交
378
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
I
Igor Canadi 已提交
379
  assert(!read_only_);
I
Igor Canadi 已提交
380 381 382 383
  Status s;
  std::vector<std::string> live_files;
  VectorLogPtr live_wal_files;
  uint64_t manifest_file_size = 0;
384
  uint64_t sequence_number = db->GetLatestSequenceNumber();
I
Igor Canadi 已提交
385 386 387 388 389 390 391

  s = db->DisableFileDeletions();
  if (s.ok()) {
    // this will return live_files prefixed with "/"
    s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
  }
  // if we didn't flush before backup, we need to also get WAL files
392
  if (s.ok() && !flush_before_backup && options_.backup_log_files) {
I
Igor Canadi 已提交
393 394 395 396 397 398 399 400 401 402 403 404
    // returns file names prefixed with "/"
    s = db->GetSortedWalFiles(live_wal_files);
  }
  if (!s.ok()) {
    db->EnableFileDeletions();
    return s;
  }

  BackupID new_backup_id = latest_backup_id_ + 1;
  assert(backups_.find(new_backup_id) == backups_.end());
  auto ret = backups_.insert(std::make_pair(
      new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id),
L
Lei Jin 已提交
405
                                &backuped_file_infos_, backup_env_)));
I
Igor Canadi 已提交
406 407 408
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
  new_backup.RecordTimestamp();
409
  new_backup.SetSequenceNumber(sequence_number);
I
Igor Canadi 已提交
410 411 412 413

  Log(options_.info_log, "Started the backup process -- creating backup %u",
      new_backup_id);

I
Igor Canadi 已提交
414 415 416
  // create temporary private dir
  s = backup_env_->CreateDir(
      GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
I
Igor Canadi 已提交
417

I
Igor Canadi 已提交
418 419 420 421 422 423 424
  unique_ptr<RateLimiter> rate_limiter;
  if (options_.backup_rate_limit > 0) {
    copy_file_buffer_size_ = options_.backup_rate_limit / 10;
    rate_limiter.reset(new RateLimiter(db_env_, options_.backup_rate_limit,
                                       copy_file_buffer_size_));
  }

I
Igor Canadi 已提交
425 426 427 428 429
  // copy live_files
  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(live_files[i], &number, &type);
I
Igor Canadi 已提交
430 431 432 433
    if (!ok) {
      assert(false);
      return Status::Corruption("Can't parse file name. This is very bad");
    }
I
Igor Canadi 已提交
434
    // we should only get sst, manifest and current files here
I
Igor Canadi 已提交
435 436
    assert(type == kTableFile || type == kDescriptorFile ||
           type == kCurrentFile);
I
Igor Canadi 已提交
437 438 439 440 441 442

    // rules:
    // * if it's kTableFile, than it's shared
    // * if it's kDescriptorFile, limit the size to manifest_file_size
    s = BackupFile(new_backup_id,
                   &new_backup,
I
Igor Canadi 已提交
443
                   options_.share_table_files && type == kTableFile,
I
Igor Canadi 已提交
444 445
                   db->GetName(),            /* src_dir */
                   live_files[i],            /* src_fname */
I
Igor Canadi 已提交
446
                   rate_limiter.get(),
I
Igor Canadi 已提交
447 448 449 450 451 452 453 454 455 456 457 458
                   (type == kDescriptorFile) ? manifest_file_size : 0);
  }

  // copy WAL files
  for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
    if (live_wal_files[i]->Type() == kAliveLogFile) {
      // we only care about live log files
      // copy the file into backup_dir/files/<new backup>/
      s = BackupFile(new_backup_id,
                     &new_backup,
                     false, /* not shared */
                     db->GetOptions().wal_dir,
I
Igor Canadi 已提交
459 460
                     live_wal_files[i]->PathName(),
                     rate_limiter.get());
I
Igor Canadi 已提交
461 462 463 464 465 466
    }
  }

  // we copied all the files, enable file deletions
  db->EnableFileDeletions();

I
Igor Canadi 已提交
467 468 469 470 471 472 473
  if (s.ok()) {
    // move tmp private backup to real backup folder
    s = backup_env_->RenameFile(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)), // tmp
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)));
  }

I
Igor Canadi 已提交
474 475 476 477 478 479 480 481
  if (s.ok()) {
    // persist the backup metadata on the disk
    s = new_backup.StoreToFile(options_.sync);
  }
  if (s.ok()) {
    // install the newly created backup meta! (atomic)
    s = PutLatestBackupFileContents(new_backup_id);
  }
I
Igor Canadi 已提交
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  if (s.ok() && options_.sync) {
    unique_ptr<Directory> backup_private_directory;
    backup_env_->NewDirectory(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
        &backup_private_directory);
    if (backup_private_directory != nullptr) {
      backup_private_directory->Fsync();
    }
    if (private_directory_ != nullptr) {
      private_directory_->Fsync();
    }
    if (meta_directory_ != nullptr) {
      meta_directory_->Fsync();
    }
    if (shared_directory_ != nullptr) {
      shared_directory_->Fsync();
    }
    if (backup_directory_ != nullptr) {
      backup_directory_->Fsync();
    }
  }

I
Igor Canadi 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
  if (!s.ok()) {
    // clean all the files we might have created
    Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str());
    backups_.erase(new_backup_id);
    GarbageCollection(true);
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
  Log(options_.info_log, "Backup DONE. All is good");
  return s;
}

I
Igor Canadi 已提交
519
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
I
Igor Canadi 已提交
520
  assert(!read_only_);
I
Igor Canadi 已提交
521 522 523 524 525 526 527 528 529 530 531 532
  Log(options_.info_log, "Purging old backups, keeping %u",
      num_backups_to_keep);
  while (num_backups_to_keep < backups_.size()) {
    Log(options_.info_log, "Deleting backup %u", backups_.begin()->first);
    backups_.begin()->second.Delete();
    obsolete_backups_.push_back(backups_.begin()->first);
    backups_.erase(backups_.begin());
  }
  GarbageCollection(false);
  return Status::OK();
}

I
Igor Canadi 已提交
533
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
I
Igor Canadi 已提交
534
  assert(!read_only_);
I
Igor Canadi 已提交
535 536 537 538 539 540 541 542 543 544 545 546
  Log(options_.info_log, "Deleting backup %u", backup_id);
  auto backup = backups_.find(backup_id);
  if (backup == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  backup->second.Delete();
  obsolete_backups_.push_back(backup_id);
  backups_.erase(backup);
  GarbageCollection(false);
  return Status::OK();
}

I
Igor Canadi 已提交
547
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
I
Igor Canadi 已提交
548 549 550 551 552 553 554 555 556
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
    if (!backup.second.Empty()) {
      backup_info->push_back(BackupInfo(
          backup.first, backup.second.GetTimestamp(), backup.second.GetSize()));
    }
  }
}

557 558 559
Status BackupEngineImpl::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
I
Igor Canadi 已提交
560 561 562 563 564 565 566 567 568 569
  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  auto& backup = backup_itr->second;
  if (backup.Empty()) {
    return Status::NotFound("Backup not found");
  }

  Log(options_.info_log, "Restoring backup id %u\n", backup_id);
570 571
  Log(options_.info_log, "keep_log_files: %d\n",
      static_cast<int>(restore_options.keep_log_files));
I
Igor Canadi 已提交
572 573 574 575 576

  // just in case. Ignore errors
  db_env_->CreateDirIfMissing(db_dir);
  db_env_->CreateDirIfMissing(wal_dir);

577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
  if (restore_options.keep_log_files) {
    // delete files in db_dir, but keep all the log files
    DeleteChildren(db_dir, 1 << kLogFile);
    // move all the files from archive dir to wal_dir
    std::string archive_dir = ArchivalDirectory(wal_dir);
    std::vector<std::string> archive_files;
    db_env_->GetChildren(archive_dir, &archive_files);  // ignore errors
    for (const auto& f : archive_files) {
      uint64_t number;
      FileType type;
      bool ok = ParseFileName(f, &number, &type);
      if (ok && type == kLogFile) {
        Log(options_.info_log, "Moving log file from archive/ to wal_dir: %s",
            f.c_str());
        Status s =
            db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
        if (!s.ok()) {
          // if we can't move log file from archive_dir to wal_dir,
          // we should fail, since it might mean data loss
          return s;
        }
      }
    }
  } else {
    DeleteChildren(wal_dir);
    DeleteChildren(ArchivalDirectory(wal_dir));
    DeleteChildren(db_dir);
604
  }
I
Igor Canadi 已提交
605

I
Igor Canadi 已提交
606 607 608 609 610 611
  unique_ptr<RateLimiter> rate_limiter;
  if (options_.restore_rate_limit > 0) {
    copy_file_buffer_size_ = options_.restore_rate_limit / 10;
    rate_limiter.reset(new RateLimiter(db_env_, options_.restore_rate_limit,
                                       copy_file_buffer_size_));
  }
I
Igor Canadi 已提交
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
  Status s;
  for (auto& file : backup.GetFiles()) {
    std::string dst;
    // 1. extract the filename
    size_t slash = file.find_last_of('/');
    // file will either be shared/<file> or private/<number>/<file>
    assert(slash != std::string::npos);
    dst = file.substr(slash + 1);

    // 2. find the filetype
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(dst, &number, &type);
    if (!ok) {
      return Status::Corruption("Backup corrupted");
    }
    // 3. Construct the final path
    // kLogFile lives in wal_dir and all the rest live in db_dir
    dst = ((type == kLogFile) ? wal_dir : db_dir) +
      "/" + dst;

    Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
L
Lei Jin 已提交
634 635
    uint32_t checksum_value;
    s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
I
Igor Canadi 已提交
636
                 rate_limiter.get(), nullptr /* size */, &checksum_value);
I
Igor Canadi 已提交
637 638 639
    if (!s.ok()) {
      break;
    }
L
Lei Jin 已提交
640 641 642 643 644 645 646

    const auto iter = backuped_file_infos_.find(file);
    assert(iter != backuped_file_infos_.end());
    if (iter->second.checksum_value != checksum_value) {
      s = Status::Corruption("Checksum check failed");
      break;
    }
I
Igor Canadi 已提交
647 648 649 650 651 652 653
  }

  Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
  return s;
}

// latest backup id is an ASCII representation of latest backup id
I
Igor Canadi 已提交
654
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
I
Igor Canadi 已提交
655 656 657 658 659 660 661 662 663
  Status s;
  unique_ptr<SequentialFile> file;
  s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
                                     &file,
                                     EnvOptions());
  if (!s.ok()) {
    return s;
  }

664 665
  char buf[11];
  Slice data;
I
Igor Canadi 已提交
666 667 668 669
  s = file->Read(10, &data, buf);
  if (!s.ok() || data.size() == 0) {
    return s.ok() ? Status::Corruption("Latest backup file corrupted") : s;
  }
670
  buf[data.size()] = 0;
I
Igor Canadi 已提交
671

672
  *latest_backup = 0;
I
Igor Canadi 已提交
673 674 675 676 677 678 679 680 681 682 683
  sscanf(data.data(), "%u", latest_backup);
  if (backup_env_->FileExists(GetBackupMetaFile(*latest_backup)) == false) {
    s = Status::Corruption("Latest backup file corrupted");
  }
  return Status::OK();
}

// this operation HAS to be atomic
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
I
Igor Canadi 已提交
684
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
I
Igor Canadi 已提交
685
  assert(!read_only_);
I
Igor Canadi 已提交
686 687 688 689 690 691 692 693 694 695 696 697
  Status s;
  unique_ptr<WritableFile> file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = backup_env_->NewWritableFile(GetLatestBackupFile(true),
                                   &file,
                                   env_options);
  if (!s.ok()) {
    backup_env_->DeleteFile(GetLatestBackupFile(true));
    return s;
  }

698
  char file_contents[10];
I
Igor Canadi 已提交
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
  int len = sprintf(file_contents, "%u\n", latest_backup);
  s = file->Append(Slice(file_contents, len));
  if (s.ok() && options_.sync) {
    file->Sync();
  }
  if (s.ok()) {
    s = file->Close();
  }
  if (s.ok()) {
    // atomically replace real file with new tmp
    s = backup_env_->RenameFile(GetLatestBackupFile(true),
                                GetLatestBackupFile(false));
  }
  return s;
}

I
Igor Canadi 已提交
715 716
Status BackupEngineImpl::CopyFile(const std::string& src,
                                  const std::string& dst, Env* src_env,
I
Igor Canadi 已提交
717 718
                                  Env* dst_env, bool sync,
                                  RateLimiter* rate_limiter, uint64_t* size,
I
Igor Canadi 已提交
719 720
                                  uint32_t* checksum_value,
                                  uint64_t size_limit) {
I
Igor Canadi 已提交
721 722 723 724 725
  Status s;
  unique_ptr<WritableFile> dst_file;
  unique_ptr<SequentialFile> src_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
726
  env_options.use_os_buffer = false;
I
Igor Canadi 已提交
727 728 729
  if (size != nullptr) {
    *size = 0;
  }
L
Lei Jin 已提交
730 731 732
  if (checksum_value != nullptr) {
    *checksum_value = 0;
  }
I
Igor Canadi 已提交
733 734 735 736 737 738 739 740 741 742 743 744 745 746

  // Check if size limit is set. if not, set it to very big number
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (s.ok()) {
    s = dst_env->NewWritableFile(dst, &dst_file, env_options);
  }
  if (!s.ok()) {
    return s;
  }

747 748
  unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;
I
Igor Canadi 已提交
749 750

  do {
I
Igor Canadi 已提交
751 752 753
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
I
Igor Canadi 已提交
754 755
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
756
    s = src_file->Read(buffer_to_read, &data, buf.get());
I
Igor Canadi 已提交
757
    size_limit -= data.size();
L
Lei Jin 已提交
758 759 760 761 762

    if (!s.ok()) {
      return s;
    }

I
Igor Canadi 已提交
763 764 765
    if (size != nullptr) {
      *size += data.size();
    }
L
Lei Jin 已提交
766 767 768
    if (checksum_value != nullptr) {
      *checksum_value = crc32c::Extend(*checksum_value, data.data(),
                                       data.size());
I
Igor Canadi 已提交
769
    }
L
Lei Jin 已提交
770
    s = dst_file->Append(data);
I
Igor Canadi 已提交
771 772 773
    if (rate_limiter != nullptr) {
      rate_limiter->ReportAndWait(data.size());
    }
I
Igor Canadi 已提交
774 775 776 777 778 779 780 781 782 783
  } while (s.ok() && data.size() > 0 && size_limit > 0);

  if (s.ok() && sync) {
    s = dst_file->Sync();
  }

  return s;
}

// src_fname will always start with "/"
I
Igor Canadi 已提交
784 785 786
Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
                                    bool shared, const std::string& src_dir,
                                    const std::string& src_fname,
I
Igor Canadi 已提交
787
                                    RateLimiter* rate_limiter,
I
Igor Canadi 已提交
788
                                    uint64_t size_limit) {
I
Igor Canadi 已提交
789 790 791

  assert(src_fname.size() > 0 && src_fname[0] == '/');
  std::string dst_relative = src_fname.substr(1);
I
Igor Canadi 已提交
792
  std::string dst_relative_tmp;
I
Igor Canadi 已提交
793
  if (shared) {
I
Igor Canadi 已提交
794 795
    dst_relative_tmp = GetSharedFileRel(dst_relative, true);
    dst_relative = GetSharedFileRel(dst_relative, false);
I
Igor Canadi 已提交
796
  } else {
I
Igor Canadi 已提交
797 798
    dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative);
    dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
I
Igor Canadi 已提交
799 800
  }
  std::string dst_path = GetAbsolutePath(dst_relative);
I
Igor Canadi 已提交
801
  std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp);
I
Igor Canadi 已提交
802 803 804 805 806
  Status s;
  uint64_t size;

  // if it's shared, we also need to check if it exists -- if it does,
  // no need to copy it again
L
Lei Jin 已提交
807
  uint32_t checksum_value = 0;
I
Igor Canadi 已提交
808 809
  if (shared && backup_env_->FileExists(dst_path)) {
    backup_env_->GetFileSize(dst_path, &size); // Ignore error
L
Lei Jin 已提交
810 811 812 813 814 815
    Log(options_.info_log, "%s already present, calculate checksum",
        src_fname.c_str());
    s = CalculateChecksum(src_dir + src_fname,
                          db_env_,
                          size_limit,
                          &checksum_value);
I
Igor Canadi 已提交
816 817 818
  } else {
    Log(options_.info_log, "Copying %s", src_fname.c_str());
    s = CopyFile(src_dir + src_fname,
I
Igor Canadi 已提交
819
                 dst_path_tmp,
I
Igor Canadi 已提交
820 821 822
                 db_env_,
                 backup_env_,
                 options_.sync,
I
Igor Canadi 已提交
823
                 rate_limiter,
I
Igor Canadi 已提交
824
                 &size,
L
Lei Jin 已提交
825
                 &checksum_value,
I
Igor Canadi 已提交
826
                 size_limit);
I
Igor Canadi 已提交
827 828 829
    if (s.ok() && shared) {
      s = backup_env_->RenameFile(dst_path_tmp, dst_path);
    }
I
Igor Canadi 已提交
830 831
  }
  if (s.ok()) {
L
Lei Jin 已提交
832 833 834 835 836
    s = backup->AddFile(FileInfo(dst_relative, size, checksum_value));
  }
  return s;
}

I
Igor Canadi 已提交
837 838 839
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
                                           uint64_t size_limit,
                                           uint32_t* checksum_value) {
L
Lei Jin 已提交
840 841 842 843 844 845 846
  *checksum_value = 0;
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
847
  env_options.use_os_buffer = false;
L
Lei Jin 已提交
848 849 850 851 852

  std::unique_ptr<SequentialFile> src_file;
  Status s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (!s.ok()) {
    return s;
I
Igor Canadi 已提交
853
  }
L
Lei Jin 已提交
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873

  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;

  do {
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
    s = src_file->Read(buffer_to_read, &data, buf.get());

    if (!s.ok()) {
      return s;
    }

    size_limit -= data.size();
    *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
  } while (data.size() > 0 && size_limit > 0);

I
Igor Canadi 已提交
874 875 876
  return s;
}

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
void BackupEngineImpl::DeleteChildren(const std::string& dir,
                                      uint32_t file_type_filter) {
  std::vector<std::string> children;
  db_env_->GetChildren(dir, &children);  // ignore errors

  for (const auto& f : children) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(f, &number, &type);
    if (ok && (file_type_filter & (1 << type))) {
      // don't delete this file
      continue;
    }
    db_env_->DeleteFile(dir + "/" + f);  // ignore errors
  }
}

I
Igor Canadi 已提交
894
void BackupEngineImpl::GarbageCollection(bool full_scan) {
I
Igor Canadi 已提交
895
  assert(!read_only_);
I
Igor Canadi 已提交
896 897
  Log(options_.info_log, "Starting garbage collection");
  std::vector<std::string> to_delete;
L
Lei Jin 已提交
898 899
  for (auto& itr : backuped_file_infos_) {
    if (itr.second.refs == 0) {
I
Igor Canadi 已提交
900 901 902 903 904 905 906
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
      Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
          s.ToString().c_str());
      to_delete.push_back(itr.first);
    }
  }
  for (auto& td : to_delete) {
L
Lei Jin 已提交
907
    backuped_file_infos_.erase(td);
I
Igor Canadi 已提交
908 909 910 911 912 913 914 915 916 917 918
  }
  if (!full_scan) {
    // take care of private dirs -- if full_scan == true, then full_scan will
    // take care of them
    for (auto backup_id : obsolete_backups_) {
      std::string private_dir = GetPrivateFileRel(backup_id);
      Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
      Log(options_.info_log, "Deleting private dir %s -- %s",
          private_dir.c_str(), s.ToString().c_str());
    }
  }
919
  obsolete_backups_.clear();
I
Igor Canadi 已提交
920 921 922 923 924 925 926 927 928 929

  if (full_scan) {
    Log(options_.info_log, "Starting full scan garbage collection");
    // delete obsolete shared files
    std::vector<std::string> shared_children;
    backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
                             &shared_children);
    for (auto& child : shared_children) {
      std::string rel_fname = GetSharedFileRel(child);
      // if it's not refcounted, delete it
L
Lei Jin 已提交
930
      if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) {
I
Igor Canadi 已提交
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945
        // this might be a directory, but DeleteFile will just fail in that
        // case, so we're good
        Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
        if (s.ok()) {
          Log(options_.info_log, "Deleted %s", rel_fname.c_str());
        }
      }
    }

    // delete obsolete private files
    std::vector<std::string> private_children;
    backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
                             &private_children);
    for (auto& child : private_children) {
      BackupID backup_id = 0;
I
Igor Canadi 已提交
946
      bool tmp_dir = child.find(".tmp") != std::string::npos;
I
Igor Canadi 已提交
947
      sscanf(child.c_str(), "%u", &backup_id);
I
Igor Canadi 已提交
948 949
      if (!tmp_dir && // if it's tmp_dir, delete it
          (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
I
Igor Canadi 已提交
950 951 952 953 954
        // it's either not a number or it's still alive. continue
        continue;
      }
      // here we have to delete the dir and all its children
      std::string full_private_path =
I
Igor Canadi 已提交
955
          GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir));
I
Igor Canadi 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974
      std::vector<std::string> subchildren;
      backup_env_->GetChildren(full_private_path, &subchildren);
      for (auto& subchild : subchildren) {
        Status s = backup_env_->DeleteFile(full_private_path + subchild);
        if (s.ok()) {
          Log(options_.info_log, "Deleted %s",
              (full_private_path + subchild).c_str());
        }
      }
      // finally delete the private dir
      Status s = backup_env_->DeleteDir(full_private_path);
      Log(options_.info_log, "Deleted dir %s -- %s", full_private_path.c_str(),
          s.ToString().c_str());
    }
  }
}

// ------- BackupMeta class --------

I
Igor Canadi 已提交
975
Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) {
L
Lei Jin 已提交
976 977 978 979 980 981 982 983 984 985 986 987
  size_ += file_info.size;
  files_.push_back(file_info.filename);

  auto itr = file_infos_->find(file_info.filename);
  if (itr == file_infos_->end()) {
    auto ret = file_infos_->insert({file_info.filename, file_info});
    if (ret.second) {
      ret.first->second.refs = 1;
    } else {
      // if this happens, something is seriously wrong
      return Status::Corruption("In memory metadata insertion error");
    }
I
Igor Canadi 已提交
988
  } else {
L
Lei Jin 已提交
989 990 991 992
    if (itr->second.checksum_value != file_info.checksum_value) {
      return Status::Corruption("Checksum mismatch for existing backup file");
    }
    ++itr->second.refs; // increase refcount if already present
I
Igor Canadi 已提交
993
  }
L
Lei Jin 已提交
994 995

  return Status::OK();
I
Igor Canadi 已提交
996 997
}

I
Igor Canadi 已提交
998
void BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
L
Lei Jin 已提交
999 1000 1001 1002
  for (const auto& file : files_) {
    auto itr = file_infos_->find(file);
    assert(itr != file_infos_->end());
    --(itr->second.refs); // decrease refcount
I
Igor Canadi 已提交
1003 1004 1005
  }
  files_.clear();
  // delete meta file
I
Igor Canadi 已提交
1006 1007 1008
  if (delete_meta) {
    env_->DeleteFile(meta_filename_);
  }
I
Igor Canadi 已提交
1009 1010 1011 1012 1013
  timestamp_ = 0;
}

// each backup meta file is of the format:
// <timestamp>
1014
// <seq number>
I
Igor Canadi 已提交
1015
// <number of files>
L
Lei Jin 已提交
1016 1017
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
I
Igor Canadi 已提交
1018
// ...
I
Igor Canadi 已提交
1019 1020
Status BackupEngineImpl::BackupMeta::LoadFromFile(
    const std::string& backup_dir) {
I
Igor Canadi 已提交
1021 1022 1023 1024 1025 1026 1027 1028
  assert(Empty());
  Status s;
  unique_ptr<SequentialFile> backup_meta_file;
  s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  if (!s.ok()) {
    return s;
  }

1029 1030 1031
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
  Slice data;
  s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf.get());
I
Igor Canadi 已提交
1032 1033

  if (!s.ok() || data.size() == max_backup_meta_file_size_) {
L
Lei Jin 已提交
1034
    return s.ok() ? Status::Corruption("File size too big") : s;
I
Igor Canadi 已提交
1035 1036 1037 1038 1039
  }
  buf[data.size()] = 0;

  uint32_t num_files = 0;
  int bytes_read = 0;
I
Igor Canadi 已提交
1040
  sscanf(data.data(), "%" PRId64 "%n", &timestamp_, &bytes_read);
I
Igor Canadi 已提交
1041
  data.remove_prefix(bytes_read + 1); // +1 for '\n'
I
Igor Canadi 已提交
1042
  sscanf(data.data(), "%" PRIu64 "%n", &sequence_number_, &bytes_read);
1043
  data.remove_prefix(bytes_read + 1); // +1 for '\n'
I
Igor Canadi 已提交
1044 1045 1046
  sscanf(data.data(), "%u%n", &num_files, &bytes_read);
  data.remove_prefix(bytes_read + 1); // +1 for '\n'

L
Lei Jin 已提交
1047
  std::vector<FileInfo> files;
1048

I
Igor Canadi 已提交
1049
  for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
L
Lei Jin 已提交
1050 1051 1052
    auto line = GetSliceUntil(&data, '\n');
    std::string filename = GetSliceUntil(&line, ' ').ToString();

I
Igor Canadi 已提交
1053 1054
    uint64_t size;
    s = env_->GetFileSize(backup_dir + "/" + filename, &size);
I
Igor Canadi 已提交
1055 1056 1057
    if (!s.ok()) {
      return s;
    }
L
Lei Jin 已提交
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075

    if (line.empty()) {
      return Status::Corruption("File checksum is missing");
    }

    uint32_t checksum_value = 0;
    if (line.starts_with("crc32 ")) {
      line.remove_prefix(6);
      sscanf(line.data(), "%u", &checksum_value);
      if (memcmp(line.data(), std::to_string(checksum_value).c_str(),
                 line.size() - 1) != 0) {
        return Status::Corruption("Invalid checksum value");
      }
    } else {
      return Status::Corruption("Unknown checksum type");
    }

    files.emplace_back(filename, size, checksum_value);
1076 1077
  }

I
Igor Canadi 已提交
1078 1079 1080 1081 1082
  if (s.ok() && data.size() > 0) {
    // file has to be read completely. if not, we count it as corruption
    s = Status::Corruption("Tailing data in backup meta file");
  }

1083
  if (s.ok()) {
L
Lei Jin 已提交
1084 1085 1086 1087 1088
    for (const auto& file_info : files) {
      s = AddFile(file_info);
      if (!s.ok()) {
        break;
      }
1089
    }
I
Igor Canadi 已提交
1090 1091 1092 1093 1094
  }

  return s;
}

I
Igor Canadi 已提交
1095
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
I
Igor Canadi 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105
  Status s;
  unique_ptr<WritableFile> backup_meta_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file,
                            env_options);
  if (!s.ok()) {
    return s;
  }

1106
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
I
Igor Canadi 已提交
1107
  int len = 0, buf_size = max_backup_meta_file_size_;
1108
  len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1109 1110
  len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
                  sequence_number_);
1111
  len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size());
L
Lei Jin 已提交
1112 1113 1114 1115 1116 1117 1118
  for (const auto& file : files_) {
    const auto& iter = file_infos_->find(file);

    assert(iter != file_infos_->end());
    // use crc32 for now, switch to something else if needed
    len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
                    file.c_str(), iter->second.checksum_value);
I
Igor Canadi 已提交
1119 1120
  }

1121
  s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
I
Igor Canadi 已提交
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
  if (s.ok() && sync) {
    s = backup_meta_file->Sync();
  }
  if (s.ok()) {
    s = backup_meta_file->Close();
  }
  if (s.ok()) {
    s = env_->RenameFile(meta_filename_ + ".tmp", meta_filename_);
  }
  return s;
}

I
Igor Canadi 已提交
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
// -------- BackupEngineReadOnlyImpl ---------
class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
 public:
  BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options) {
    backup_engine_ = new BackupEngineImpl(db_env, options, true);
  }
  virtual ~BackupEngineReadOnlyImpl() {}

  virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) {
    backup_engine_->GetBackupInfo(backup_info);
  }

  virtual Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) {
    return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                               restore_options);
  }

  virtual Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) {
    return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                     restore_options);
  }

 private:
  BackupEngineImpl* backup_engine_;
};

BackupEngineReadOnly* BackupEngineReadOnly::NewReadOnlyBackupEngine(
    Env* db_env, const BackupableDBOptions& options) {
  if (options.destroy_old_data) {
    assert(false);
    return nullptr;
  }
  return new BackupEngineReadOnlyImpl(db_env, options);
}

I
Igor Canadi 已提交
1173 1174
// --- BackupableDB methods --------

1175
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
I
Igor Canadi 已提交
1176
    : StackableDB(db),
I
Igor Canadi 已提交
1177
      backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {}
I
Igor Canadi 已提交
1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198

BackupableDB::~BackupableDB() {
  delete backup_engine_;
}

Status BackupableDB::CreateNewBackup(bool flush_before_backup) {
  return backup_engine_->CreateNewBackup(this, flush_before_backup);
}

void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  backup_engine_->GetBackupInfo(backup_info);
}

Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status BackupableDB::DeleteBackup(BackupID backup_id) {
  return backup_engine_->DeleteBackup(backup_id);
}

I
Igor Canadi 已提交
1199 1200 1201 1202
void BackupableDB::StopBackup() {
  backup_engine_->StopBackup();
}

I
Igor Canadi 已提交
1203 1204 1205 1206
// --- RestoreBackupableDB methods ------

RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
                                         const BackupableDBOptions& options)
I
Igor Canadi 已提交
1207
    : backup_engine_(new BackupEngineImpl(db_env, options)) {}
I
Igor Canadi 已提交
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217

RestoreBackupableDB::~RestoreBackupableDB() {
  delete backup_engine_;
}

void
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  backup_engine_->GetBackupInfo(backup_info);
}

1218 1219 1220 1221 1222
Status RestoreBackupableDB::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
  return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                             restore_options);
I
Igor Canadi 已提交
1223 1224
}

1225 1226 1227 1228 1229
Status RestoreBackupableDB::RestoreDBFromLatestBackup(
    const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
  return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                   restore_options);
I
Igor Canadi 已提交
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
}

Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) {
  return backup_engine_->DeleteBackup(backup_id);
}

}  // namespace rocksdb
I
Igor Canadi 已提交
1241 1242

#endif  // ROCKSDB_LITE