backupable_db.cc 33.9 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9 10 11 12
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "utilities/backupable_db.h"
#include "db/filename.h"
#include "util/coding.h"
L
Lei Jin 已提交
13
#include "util/crc32c.h"
I
Igor Canadi 已提交
14 15 16 17 18 19 20 21 22 23
#include "rocksdb/transaction_log.h"

#define __STDC_FORMAT_MACROS

#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <map>
#include <string>
#include <limits>
I
Igor Canadi 已提交
24
#include <atomic>
25
#include <unordered_map>
I
Igor Canadi 已提交
26 27 28

namespace rocksdb {

I
Igor Canadi 已提交
29 30
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
I
Igor Canadi 已提交
31
 public:
I
Igor Canadi 已提交
32 33
  BackupEngineImpl(Env* db_env, const BackupableDBOptions& options);
  ~BackupEngineImpl();
I
Igor Canadi 已提交
34 35 36
  Status CreateNewBackup(DB* db, bool flush_before_backup = false);
  Status PurgeOldBackups(uint32_t num_backups_to_keep);
  Status DeleteBackup(BackupID backup_id);
I
Igor Canadi 已提交
37 38 39
  void StopBackup() {
    stop_backup_.store(true, std::memory_order_release);
  }
I
Igor Canadi 已提交
40 41 42 43 44 45 46 47 48

  void GetBackupInfo(std::vector<BackupInfo>* backup_info);
  Status RestoreDBFromBackup(BackupID backup_id, const std::string &db_dir,
                             const std::string &wal_dir);
  Status RestoreDBFromLatestBackup(const std::string &db_dir,
                                   const std::string &wal_dir) {
    return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir);
  }

49 50
  void DeleteBackupsNewerThan(uint64_t sequence_number);

I
Igor Canadi 已提交
51
 private:
L
Lei Jin 已提交
52 53 54 55 56 57 58 59 60 61
  struct FileInfo {
    FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
      : refs(0), filename(fname), size(sz), checksum_value(checksum) {}

    int refs;
    const std::string filename;
    const uint64_t size;
    uint32_t checksum_value;
  };

I
Igor Canadi 已提交
62 63 64
  class BackupMeta {
   public:
    BackupMeta(const std::string& meta_filename,
L
Lei Jin 已提交
65
        std::unordered_map<std::string, FileInfo>* file_infos, Env* env)
I
Igor Canadi 已提交
66
      : timestamp_(0), size_(0), meta_filename_(meta_filename),
L
Lei Jin 已提交
67
        file_infos_(file_infos), env_(env) {}
I
Igor Canadi 已提交
68 69 70 71 72 73 74 75 76 77 78 79

    ~BackupMeta() {}

    void RecordTimestamp() {
      env_->GetCurrentTime(&timestamp_);
    }
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
80 81 82 83 84 85
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
I
Igor Canadi 已提交
86

L
Lei Jin 已提交
87 88
    Status AddFile(const FileInfo& file_info);

I
Igor Canadi 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
    void Delete();

    bool Empty() {
      return files_.empty();
    }

    const std::vector<std::string>& GetFiles() {
      return files_;
    }

    Status LoadFromFile(const std::string& backup_dir);
    Status StoreToFile(bool sync);

   private:
    int64_t timestamp_;
104 105 106
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
I
Igor Canadi 已提交
107 108 109 110
    uint64_t size_;
    std::string const meta_filename_;
    // files with relative paths (without "/" prefix!!)
    std::vector<std::string> files_;
L
Lei Jin 已提交
111
    std::unordered_map<std::string, FileInfo>* file_infos_;
I
Igor Canadi 已提交
112
    Env* env_;
113 114

    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
I
Igor Canadi 已提交
115 116 117 118 119 120 121 122 123 124 125
  }; // BackupMeta

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
  inline std::string GetPrivateFileRel(BackupID backup_id,
I
Igor Canadi 已提交
126 127
                                       bool tmp = false,
                                       const std::string& file = "") const {
I
Igor Canadi 已提交
128
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
129 130
    return GetPrivateDirRel() + "/" + std::to_string(backup_id) +
           (tmp ? ".tmp" : "") + "/" + file;
I
Igor Canadi 已提交
131
  }
I
Igor Canadi 已提交
132 133
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
I
Igor Canadi 已提交
134
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
135
    return "shared/" + file + (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
  }
  inline std::string GetLatestBackupFile(bool tmp = false) const {
    return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : ""));
  }
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
  inline std::string GetBackupMetaFile(BackupID backup_id) const {
    return GetBackupMetaDir() + "/" + std::to_string(backup_id);
  }

  Status GetLatestBackupFileContents(uint32_t* latest_backup);
  Status PutLatestBackupFileContents(uint32_t latest_backup);
  // if size_limit == 0, there is no size limit, copy everything
  Status CopyFile(const std::string& src,
                  const std::string& dst,
                  Env* src_env,
                  Env* dst_env,
                  bool sync,
                  uint64_t* size = nullptr,
L
Lei Jin 已提交
156
                  uint32_t* checksum_value = nullptr,
I
Igor Canadi 已提交
157 158 159 160 161 162 163 164
                  uint64_t size_limit = 0);
  // if size_limit == 0, there is no size limit, copy everything
  Status BackupFile(BackupID backup_id,
                    BackupMeta* backup,
                    bool shared,
                    const std::string& src_dir,
                    const std::string& src_fname, // starts with "/"
                    uint64_t size_limit = 0);
L
Lei Jin 已提交
165 166 167 168 169 170

  Status CalculateChecksum(const std::string& src,
                           Env* src_env,
                           uint64_t size_limit,
                           uint32_t* checksum_value);

I
Igor Canadi 已提交
171 172
  // Will delete all the files we don't need anymore
  // If full_scan == true, it will do the full scan of files/ directory
L
Lei Jin 已提交
173
  // and delete all the files that are not referenced from backuped_file_infos__
I
Igor Canadi 已提交
174 175 176 177 178
  void GarbageCollection(bool full_scan);

  // backup state data
  BackupID latest_backup_id_;
  std::map<BackupID, BackupMeta> backups_;
L
Lei Jin 已提交
179
  std::unordered_map<std::string, FileInfo> backuped_file_infos_;
I
Igor Canadi 已提交
180
  std::vector<BackupID> obsolete_backups_;
I
Igor Canadi 已提交
181
  std::atomic<bool> stop_backup_;
I
Igor Canadi 已提交
182 183 184 185 186 187 188 189 190

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

  static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB
};

I
Igor Canadi 已提交
191 192 193 194 195 196 197
BackupEngine* CreateNewBackupEngine(Env* db_env,
                                    const BackupableDBOptions& options) {
  return new BackupEngineImpl(db_env, options);
}

BackupEngineImpl::BackupEngineImpl(Env* db_env,
                                   const BackupableDBOptions& options)
I
Igor Canadi 已提交
198 199 200 201 202
    : stop_backup_(false),
      options_(options),
      db_env_(db_env),
      backup_env_(options.backup_env != nullptr ? options.backup_env
                                                : db_env_) {
I
Igor Canadi 已提交
203 204 205

  // create all the dirs we need
  backup_env_->CreateDirIfMissing(GetAbsolutePath());
206
  if (options_.share_table_files) {
I
Igor Canadi 已提交
207 208
    backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
  }
I
Igor Canadi 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
  backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel()));
  backup_env_->CreateDirIfMissing(GetBackupMetaDir());

  std::vector<std::string> backup_meta_files;
  backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
  // create backups_ structure
  for (auto& file : backup_meta_files) {
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
    if (backup_id == 0 || file != std::to_string(backup_id)) {
      // invalid file name, delete that
      backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
    backups_.insert(std::make_pair(
        backup_id, BackupMeta(GetBackupMetaFile(backup_id),
L
Lei Jin 已提交
226
                              &backuped_file_infos_, backup_env_)));
I
Igor Canadi 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
  }

  if (options_.destroy_old_data) { // Destory old data
    for (auto& backup : backups_) {
      backup.second.Delete();
      obsolete_backups_.push_back(backup.first);
    }
    backups_.clear();
    // start from beginning
    latest_backup_id_ = 0;
    // GarbageCollection() will do the actual deletion
  } else { // Load data from storage
    // load the backups if any
    for (auto& backup : backups_) {
      Status s = backup.second.LoadFromFile(options_.backup_dir);
      if (!s.ok()) {
        Log(options_.info_log, "Backup %u corrupted - deleting -- %s",
            backup.first, s.ToString().c_str());
        backup.second.Delete();
        obsolete_backups_.push_back(backup.first);
      }
    }
    // delete obsolete backups from the structure
    for (auto ob : obsolete_backups_) {
      backups_.erase(ob);
    }

    Status s = GetLatestBackupFileContents(&latest_backup_id_);
    // If latest backup file is corrupted or non-existent
    // set latest backup as the biggest backup we have
    // or 0 if we have no backups
    if (!s.ok() ||
        backups_.find(latest_backup_id_) == backups_.end()) {
      auto itr = backups_.end();
      latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
    }
  }

  // delete any backups that claim to be later than latest
  for (auto itr = backups_.upper_bound(latest_backup_id_);
       itr != backups_.end();) {
    itr->second.Delete();
    obsolete_backups_.push_back(itr->first);
    itr = backups_.erase(itr);
  }

  PutLatestBackupFileContents(latest_backup_id_); // Ignore errors
  GarbageCollection(true);
  Log(options_.info_log,
      "Initialized BackupEngine, the latest backup is %u.",
      latest_backup_id_);
}

I
Igor Canadi 已提交
280
BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); }
I
Igor Canadi 已提交
281

I
Igor Canadi 已提交
282
void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) {
283 284 285
  for (auto backup : backups_) {
    if (backup.second.GetSequenceNumber() > sequence_number) {
      Log(options_.info_log,
I
Igor Canadi 已提交
286 287
          "Deleting backup %u because sequence number (%" PRIu64
          ") is newer than %" PRIu64 "",
288 289 290 291 292 293 294 295 296 297 298 299 300 301
          backup.first, backup.second.GetSequenceNumber(), sequence_number);
      backup.second.Delete();
      obsolete_backups_.push_back(backup.first);
    }
  }
  for (auto ob : obsolete_backups_) {
    backups_.erase(backups_.find(ob));
  }
  auto itr = backups_.end();
  latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
  PutLatestBackupFileContents(latest_backup_id_); // Ignore errors
  GarbageCollection(false);
}

I
Igor Canadi 已提交
302
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
I
Igor Canadi 已提交
303 304 305 306
  Status s;
  std::vector<std::string> live_files;
  VectorLogPtr live_wal_files;
  uint64_t manifest_file_size = 0;
307
  uint64_t sequence_number = db->GetLatestSequenceNumber();
I
Igor Canadi 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327

  s = db->DisableFileDeletions();
  if (s.ok()) {
    // this will return live_files prefixed with "/"
    s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
  }
  // if we didn't flush before backup, we need to also get WAL files
  if (s.ok() && !flush_before_backup) {
    // returns file names prefixed with "/"
    s = db->GetSortedWalFiles(live_wal_files);
  }
  if (!s.ok()) {
    db->EnableFileDeletions();
    return s;
  }

  BackupID new_backup_id = latest_backup_id_ + 1;
  assert(backups_.find(new_backup_id) == backups_.end());
  auto ret = backups_.insert(std::make_pair(
      new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id),
L
Lei Jin 已提交
328
                                &backuped_file_infos_, backup_env_)));
I
Igor Canadi 已提交
329 330 331
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
  new_backup.RecordTimestamp();
332
  new_backup.SetSequenceNumber(sequence_number);
I
Igor Canadi 已提交
333 334 335 336

  Log(options_.info_log, "Started the backup process -- creating backup %u",
      new_backup_id);

I
Igor Canadi 已提交
337 338 339
  // create temporary private dir
  s = backup_env_->CreateDir(
      GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
I
Igor Canadi 已提交
340 341 342 343 344 345

  // copy live_files
  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(live_files[i], &number, &type);
I
Igor Canadi 已提交
346 347 348 349
    if (!ok) {
      assert(false);
      return Status::Corruption("Can't parse file name. This is very bad");
    }
I
Igor Canadi 已提交
350 351 352 353 354 355 356 357 358 359
    // we should only get sst, manifest and current files here
    assert(type == kTableFile ||
             type == kDescriptorFile ||
             type == kCurrentFile);

    // rules:
    // * if it's kTableFile, than it's shared
    // * if it's kDescriptorFile, limit the size to manifest_file_size
    s = BackupFile(new_backup_id,
                   &new_backup,
I
Igor Canadi 已提交
360
                   options_.share_table_files && type == kTableFile,
I
Igor Canadi 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
                   db->GetName(),            /* src_dir */
                   live_files[i],            /* src_fname */
                   (type == kDescriptorFile) ? manifest_file_size : 0);
  }

  // copy WAL files
  for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
    if (live_wal_files[i]->Type() == kAliveLogFile) {
      // we only care about live log files
      // copy the file into backup_dir/files/<new backup>/
      s = BackupFile(new_backup_id,
                     &new_backup,
                     false, /* not shared */
                     db->GetOptions().wal_dir,
                     live_wal_files[i]->PathName());
    }
  }

  // we copied all the files, enable file deletions
  db->EnableFileDeletions();

I
Igor Canadi 已提交
382 383 384 385 386 387 388
  if (s.ok()) {
    // move tmp private backup to real backup folder
    s = backup_env_->RenameFile(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)), // tmp
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)));
  }

I
Igor Canadi 已提交
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
  if (s.ok()) {
    // persist the backup metadata on the disk
    s = new_backup.StoreToFile(options_.sync);
  }
  if (s.ok()) {
    // install the newly created backup meta! (atomic)
    s = PutLatestBackupFileContents(new_backup_id);
  }
  if (!s.ok()) {
    // clean all the files we might have created
    Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str());
    backups_.erase(new_backup_id);
    GarbageCollection(true);
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
  Log(options_.info_log, "Backup DONE. All is good");
  return s;
}

I
Igor Canadi 已提交
412
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
I
Igor Canadi 已提交
413 414 415 416 417 418 419 420 421 422 423 424
  Log(options_.info_log, "Purging old backups, keeping %u",
      num_backups_to_keep);
  while (num_backups_to_keep < backups_.size()) {
    Log(options_.info_log, "Deleting backup %u", backups_.begin()->first);
    backups_.begin()->second.Delete();
    obsolete_backups_.push_back(backups_.begin()->first);
    backups_.erase(backups_.begin());
  }
  GarbageCollection(false);
  return Status::OK();
}

I
Igor Canadi 已提交
425
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
I
Igor Canadi 已提交
426 427 428 429 430 431 432 433 434 435 436 437
  Log(options_.info_log, "Deleting backup %u", backup_id);
  auto backup = backups_.find(backup_id);
  if (backup == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  backup->second.Delete();
  obsolete_backups_.push_back(backup_id);
  backups_.erase(backup);
  GarbageCollection(false);
  return Status::OK();
}

I
Igor Canadi 已提交
438
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
I
Igor Canadi 已提交
439 440 441 442 443 444 445 446 447
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
    if (!backup.second.Empty()) {
      backup_info->push_back(BackupInfo(
          backup.first, backup.second.GetTimestamp(), backup.second.GetSize()));
    }
  }
}

I
Igor Canadi 已提交
448 449 450
Status BackupEngineImpl::RestoreDBFromBackup(BackupID backup_id,
                                             const std::string& db_dir,
                                             const std::string& wal_dir) {
I
Igor Canadi 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  auto& backup = backup_itr->second;
  if (backup.Empty()) {
    return Status::NotFound("Backup not found");
  }

  Log(options_.info_log, "Restoring backup id %u\n", backup_id);

  // just in case. Ignore errors
  db_env_->CreateDirIfMissing(db_dir);
  db_env_->CreateDirIfMissing(wal_dir);

  // delete log files that might have been already in wal_dir.
  // This is important since they might get replayed to the restored DB,
  // which will then differ from the backuped DB
469 470 471
  std::vector<std::string> delete_children;
  db_env_->GetChildren(wal_dir, &delete_children); // ignore errors
  for (auto f : delete_children) {
I
Igor Canadi 已提交
472 473
    db_env_->DeleteFile(wal_dir + "/" + f); // ignore errors
  }
474 475 476 477 478 479 480
  // Also delete all the db_dir children. This is not so important
  // because obsolete files will be deleted by DBImpl::PurgeObsoleteFiles()
  delete_children.clear();
  db_env_->GetChildren(db_dir, &delete_children); // ignore errors
  for (auto f : delete_children) {
    db_env_->DeleteFile(db_dir + "/" + f); // ignore errors
  }
I
Igor Canadi 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503

  Status s;
  for (auto& file : backup.GetFiles()) {
    std::string dst;
    // 1. extract the filename
    size_t slash = file.find_last_of('/');
    // file will either be shared/<file> or private/<number>/<file>
    assert(slash != std::string::npos);
    dst = file.substr(slash + 1);

    // 2. find the filetype
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(dst, &number, &type);
    if (!ok) {
      return Status::Corruption("Backup corrupted");
    }
    // 3. Construct the final path
    // kLogFile lives in wal_dir and all the rest live in db_dir
    dst = ((type == kLogFile) ? wal_dir : db_dir) +
      "/" + dst;

    Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
L
Lei Jin 已提交
504 505 506
    uint32_t checksum_value;
    s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
                 nullptr /* size */, &checksum_value);
I
Igor Canadi 已提交
507 508 509
    if (!s.ok()) {
      break;
    }
L
Lei Jin 已提交
510 511 512 513 514 515 516

    const auto iter = backuped_file_infos_.find(file);
    assert(iter != backuped_file_infos_.end());
    if (iter->second.checksum_value != checksum_value) {
      s = Status::Corruption("Checksum check failed");
      break;
    }
I
Igor Canadi 已提交
517 518 519 520 521 522 523
  }

  Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
  return s;
}

// latest backup id is an ASCII representation of latest backup id
I
Igor Canadi 已提交
524
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
I
Igor Canadi 已提交
525 526 527 528 529 530 531 532 533
  Status s;
  unique_ptr<SequentialFile> file;
  s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
                                     &file,
                                     EnvOptions());
  if (!s.ok()) {
    return s;
  }

534 535
  char buf[11];
  Slice data;
I
Igor Canadi 已提交
536 537 538 539
  s = file->Read(10, &data, buf);
  if (!s.ok() || data.size() == 0) {
    return s.ok() ? Status::Corruption("Latest backup file corrupted") : s;
  }
540
  buf[data.size()] = 0;
I
Igor Canadi 已提交
541

542
  *latest_backup = 0;
I
Igor Canadi 已提交
543 544 545 546 547 548 549 550 551 552 553
  sscanf(data.data(), "%u", latest_backup);
  if (backup_env_->FileExists(GetBackupMetaFile(*latest_backup)) == false) {
    s = Status::Corruption("Latest backup file corrupted");
  }
  return Status::OK();
}

// this operation HAS to be atomic
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
I
Igor Canadi 已提交
554
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
I
Igor Canadi 已提交
555 556 557 558 559 560 561 562 563 564 565 566
  Status s;
  unique_ptr<WritableFile> file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = backup_env_->NewWritableFile(GetLatestBackupFile(true),
                                   &file,
                                   env_options);
  if (!s.ok()) {
    backup_env_->DeleteFile(GetLatestBackupFile(true));
    return s;
  }

567
  char file_contents[10];
I
Igor Canadi 已提交
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
  int len = sprintf(file_contents, "%u\n", latest_backup);
  s = file->Append(Slice(file_contents, len));
  if (s.ok() && options_.sync) {
    file->Sync();
  }
  if (s.ok()) {
    s = file->Close();
  }
  if (s.ok()) {
    // atomically replace real file with new tmp
    s = backup_env_->RenameFile(GetLatestBackupFile(true),
                                GetLatestBackupFile(false));
  }
  return s;
}

I
Igor Canadi 已提交
584 585 586 587 588
Status BackupEngineImpl::CopyFile(const std::string& src,
                                  const std::string& dst, Env* src_env,
                                  Env* dst_env, bool sync, uint64_t* size,
                                  uint32_t* checksum_value,
                                  uint64_t size_limit) {
I
Igor Canadi 已提交
589 590 591 592 593 594 595 596
  Status s;
  unique_ptr<WritableFile> dst_file;
  unique_ptr<SequentialFile> src_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  if (size != nullptr) {
    *size = 0;
  }
L
Lei Jin 已提交
597 598 599
  if (checksum_value != nullptr) {
    *checksum_value = 0;
  }
I
Igor Canadi 已提交
600 601 602 603 604 605 606 607 608 609 610 611 612 613

  // Check if size limit is set. if not, set it to very big number
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (s.ok()) {
    s = dst_env->NewWritableFile(dst, &dst_file, env_options);
  }
  if (!s.ok()) {
    return s;
  }

614 615
  unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;
I
Igor Canadi 已提交
616 617

  do {
I
Igor Canadi 已提交
618 619 620
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
I
Igor Canadi 已提交
621 622
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
623
    s = src_file->Read(buffer_to_read, &data, buf.get());
I
Igor Canadi 已提交
624
    size_limit -= data.size();
L
Lei Jin 已提交
625 626 627 628 629

    if (!s.ok()) {
      return s;
    }

I
Igor Canadi 已提交
630 631 632
    if (size != nullptr) {
      *size += data.size();
    }
L
Lei Jin 已提交
633 634 635
    if (checksum_value != nullptr) {
      *checksum_value = crc32c::Extend(*checksum_value, data.data(),
                                       data.size());
I
Igor Canadi 已提交
636
    }
L
Lei Jin 已提交
637
    s = dst_file->Append(data);
I
Igor Canadi 已提交
638 639 640 641 642 643 644 645 646 647
  } while (s.ok() && data.size() > 0 && size_limit > 0);

  if (s.ok() && sync) {
    s = dst_file->Sync();
  }

  return s;
}

// src_fname will always start with "/"
I
Igor Canadi 已提交
648 649 650 651
Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
                                    bool shared, const std::string& src_dir,
                                    const std::string& src_fname,
                                    uint64_t size_limit) {
I
Igor Canadi 已提交
652 653 654

  assert(src_fname.size() > 0 && src_fname[0] == '/');
  std::string dst_relative = src_fname.substr(1);
I
Igor Canadi 已提交
655
  std::string dst_relative_tmp;
I
Igor Canadi 已提交
656
  if (shared) {
I
Igor Canadi 已提交
657 658
    dst_relative_tmp = GetSharedFileRel(dst_relative, true);
    dst_relative = GetSharedFileRel(dst_relative, false);
I
Igor Canadi 已提交
659
  } else {
I
Igor Canadi 已提交
660 661
    dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative);
    dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
I
Igor Canadi 已提交
662 663
  }
  std::string dst_path = GetAbsolutePath(dst_relative);
I
Igor Canadi 已提交
664
  std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp);
I
Igor Canadi 已提交
665 666 667 668 669
  Status s;
  uint64_t size;

  // if it's shared, we also need to check if it exists -- if it does,
  // no need to copy it again
L
Lei Jin 已提交
670
  uint32_t checksum_value = 0;
I
Igor Canadi 已提交
671 672
  if (shared && backup_env_->FileExists(dst_path)) {
    backup_env_->GetFileSize(dst_path, &size); // Ignore error
L
Lei Jin 已提交
673 674 675 676 677 678
    Log(options_.info_log, "%s already present, calculate checksum",
        src_fname.c_str());
    s = CalculateChecksum(src_dir + src_fname,
                          db_env_,
                          size_limit,
                          &checksum_value);
I
Igor Canadi 已提交
679 680 681
  } else {
    Log(options_.info_log, "Copying %s", src_fname.c_str());
    s = CopyFile(src_dir + src_fname,
I
Igor Canadi 已提交
682
                 dst_path_tmp,
I
Igor Canadi 已提交
683 684 685 686
                 db_env_,
                 backup_env_,
                 options_.sync,
                 &size,
L
Lei Jin 已提交
687
                 &checksum_value,
I
Igor Canadi 已提交
688
                 size_limit);
I
Igor Canadi 已提交
689 690 691
    if (s.ok() && shared) {
      s = backup_env_->RenameFile(dst_path_tmp, dst_path);
    }
I
Igor Canadi 已提交
692 693
  }
  if (s.ok()) {
L
Lei Jin 已提交
694 695 696 697 698
    s = backup->AddFile(FileInfo(dst_relative, size, checksum_value));
  }
  return s;
}

I
Igor Canadi 已提交
699 700 701
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
                                           uint64_t size_limit,
                                           uint32_t* checksum_value) {
L
Lei Jin 已提交
702 703 704 705 706 707 708 709 710 711 712 713
  *checksum_value = 0;
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  EnvOptions env_options;
  env_options.use_mmap_writes = false;

  std::unique_ptr<SequentialFile> src_file;
  Status s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (!s.ok()) {
    return s;
I
Igor Canadi 已提交
714
  }
L
Lei Jin 已提交
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734

  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;

  do {
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
    s = src_file->Read(buffer_to_read, &data, buf.get());

    if (!s.ok()) {
      return s;
    }

    size_limit -= data.size();
    *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
  } while (data.size() > 0 && size_limit > 0);

I
Igor Canadi 已提交
735 736 737
  return s;
}

I
Igor Canadi 已提交
738
void BackupEngineImpl::GarbageCollection(bool full_scan) {
I
Igor Canadi 已提交
739 740
  Log(options_.info_log, "Starting garbage collection");
  std::vector<std::string> to_delete;
L
Lei Jin 已提交
741 742
  for (auto& itr : backuped_file_infos_) {
    if (itr.second.refs == 0) {
I
Igor Canadi 已提交
743 744 745 746 747 748 749
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
      Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
          s.ToString().c_str());
      to_delete.push_back(itr.first);
    }
  }
  for (auto& td : to_delete) {
L
Lei Jin 已提交
750
    backuped_file_infos_.erase(td);
I
Igor Canadi 已提交
751 752 753 754 755 756 757 758 759 760 761
  }
  if (!full_scan) {
    // take care of private dirs -- if full_scan == true, then full_scan will
    // take care of them
    for (auto backup_id : obsolete_backups_) {
      std::string private_dir = GetPrivateFileRel(backup_id);
      Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
      Log(options_.info_log, "Deleting private dir %s -- %s",
          private_dir.c_str(), s.ToString().c_str());
    }
  }
762
  obsolete_backups_.clear();
I
Igor Canadi 已提交
763 764 765 766 767 768 769 770 771 772

  if (full_scan) {
    Log(options_.info_log, "Starting full scan garbage collection");
    // delete obsolete shared files
    std::vector<std::string> shared_children;
    backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
                             &shared_children);
    for (auto& child : shared_children) {
      std::string rel_fname = GetSharedFileRel(child);
      // if it's not refcounted, delete it
L
Lei Jin 已提交
773
      if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) {
I
Igor Canadi 已提交
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
        // this might be a directory, but DeleteFile will just fail in that
        // case, so we're good
        Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
        if (s.ok()) {
          Log(options_.info_log, "Deleted %s", rel_fname.c_str());
        }
      }
    }

    // delete obsolete private files
    std::vector<std::string> private_children;
    backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
                             &private_children);
    for (auto& child : private_children) {
      BackupID backup_id = 0;
I
Igor Canadi 已提交
789
      bool tmp_dir = child.find(".tmp") != std::string::npos;
I
Igor Canadi 已提交
790
      sscanf(child.c_str(), "%u", &backup_id);
I
Igor Canadi 已提交
791 792
      if (!tmp_dir && // if it's tmp_dir, delete it
          (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
I
Igor Canadi 已提交
793 794 795 796 797
        // it's either not a number or it's still alive. continue
        continue;
      }
      // here we have to delete the dir and all its children
      std::string full_private_path =
I
Igor Canadi 已提交
798
          GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir));
I
Igor Canadi 已提交
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
      std::vector<std::string> subchildren;
      backup_env_->GetChildren(full_private_path, &subchildren);
      for (auto& subchild : subchildren) {
        Status s = backup_env_->DeleteFile(full_private_path + subchild);
        if (s.ok()) {
          Log(options_.info_log, "Deleted %s",
              (full_private_path + subchild).c_str());
        }
      }
      // finally delete the private dir
      Status s = backup_env_->DeleteDir(full_private_path);
      Log(options_.info_log, "Deleted dir %s -- %s", full_private_path.c_str(),
          s.ToString().c_str());
    }
  }
}

// ------- BackupMeta class --------

I
Igor Canadi 已提交
818
Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) {
L
Lei Jin 已提交
819 820 821 822 823 824 825 826 827 828 829 830
  size_ += file_info.size;
  files_.push_back(file_info.filename);

  auto itr = file_infos_->find(file_info.filename);
  if (itr == file_infos_->end()) {
    auto ret = file_infos_->insert({file_info.filename, file_info});
    if (ret.second) {
      ret.first->second.refs = 1;
    } else {
      // if this happens, something is seriously wrong
      return Status::Corruption("In memory metadata insertion error");
    }
I
Igor Canadi 已提交
831
  } else {
L
Lei Jin 已提交
832 833 834 835
    if (itr->second.checksum_value != file_info.checksum_value) {
      return Status::Corruption("Checksum mismatch for existing backup file");
    }
    ++itr->second.refs; // increase refcount if already present
I
Igor Canadi 已提交
836
  }
L
Lei Jin 已提交
837 838

  return Status::OK();
I
Igor Canadi 已提交
839 840
}

I
Igor Canadi 已提交
841
void BackupEngineImpl::BackupMeta::Delete() {
L
Lei Jin 已提交
842 843 844 845
  for (const auto& file : files_) {
    auto itr = file_infos_->find(file);
    assert(itr != file_infos_->end());
    --(itr->second.refs); // decrease refcount
I
Igor Canadi 已提交
846 847 848 849 850 851 852 853 854
  }
  files_.clear();
  // delete meta file
  env_->DeleteFile(meta_filename_);
  timestamp_ = 0;
}

// each backup meta file is of the format:
// <timestamp>
855
// <seq number>
I
Igor Canadi 已提交
856
// <number of files>
L
Lei Jin 已提交
857 858
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
I
Igor Canadi 已提交
859 860
// ...
// TODO: maybe add checksum?
I
Igor Canadi 已提交
861 862
Status BackupEngineImpl::BackupMeta::LoadFromFile(
    const std::string& backup_dir) {
I
Igor Canadi 已提交
863 864 865 866 867 868 869 870
  assert(Empty());
  Status s;
  unique_ptr<SequentialFile> backup_meta_file;
  s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  if (!s.ok()) {
    return s;
  }

871 872 873
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
  Slice data;
  s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf.get());
I
Igor Canadi 已提交
874 875 876 877 878 879 880 881

  if (!s.ok() || data.size() == max_backup_meta_file_size_) {
    return s.ok() ? Status::IOError("File size too big") : s;
  }
  buf[data.size()] = 0;

  uint32_t num_files = 0;
  int bytes_read = 0;
I
Igor Canadi 已提交
882
  sscanf(data.data(), "%" PRId64 "%n", &timestamp_, &bytes_read);
I
Igor Canadi 已提交
883
  data.remove_prefix(bytes_read + 1); // +1 for '\n'
I
Igor Canadi 已提交
884
  sscanf(data.data(), "%" PRIu64 "%n", &sequence_number_, &bytes_read);
885
  data.remove_prefix(bytes_read + 1); // +1 for '\n'
I
Igor Canadi 已提交
886 887 888
  sscanf(data.data(), "%u%n", &num_files, &bytes_read);
  data.remove_prefix(bytes_read + 1); // +1 for '\n'

L
Lei Jin 已提交
889
  std::vector<FileInfo> files;
890

I
Igor Canadi 已提交
891
  for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
L
Lei Jin 已提交
892 893 894
    auto line = GetSliceUntil(&data, '\n');
    std::string filename = GetSliceUntil(&line, ' ').ToString();

I
Igor Canadi 已提交
895 896
    uint64_t size;
    s = env_->GetFileSize(backup_dir + "/" + filename, &size);
L
Lei Jin 已提交
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914

    if (line.empty()) {
      return Status::Corruption("File checksum is missing");
    }

    uint32_t checksum_value = 0;
    if (line.starts_with("crc32 ")) {
      line.remove_prefix(6);
      sscanf(line.data(), "%u", &checksum_value);
      if (memcmp(line.data(), std::to_string(checksum_value).c_str(),
                 line.size() - 1) != 0) {
        return Status::Corruption("Invalid checksum value");
      }
    } else {
      return Status::Corruption("Unknown checksum type");
    }

    files.emplace_back(filename, size, checksum_value);
915 916 917
  }

  if (s.ok()) {
L
Lei Jin 已提交
918 919 920 921 922
    for (const auto& file_info : files) {
      s = AddFile(file_info);
      if (!s.ok()) {
        break;
      }
923
    }
I
Igor Canadi 已提交
924 925 926 927 928
  }

  return s;
}

I
Igor Canadi 已提交
929
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
I
Igor Canadi 已提交
930 931 932 933 934 935 936 937 938 939
  Status s;
  unique_ptr<WritableFile> backup_meta_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file,
                            env_options);
  if (!s.ok()) {
    return s;
  }

940
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
I
Igor Canadi 已提交
941
  int len = 0, buf_size = max_backup_meta_file_size_;
942
  len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
943 944
  len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
                  sequence_number_);
945
  len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size());
L
Lei Jin 已提交
946 947 948 949 950 951 952
  for (const auto& file : files_) {
    const auto& iter = file_infos_->find(file);

    assert(iter != file_infos_->end());
    // use crc32 for now, switch to something else if needed
    len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
                    file.c_str(), iter->second.checksum_value);
I
Igor Canadi 已提交
953 954
  }

955
  s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
I
Igor Canadi 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968 969
  if (s.ok() && sync) {
    s = backup_meta_file->Sync();
  }
  if (s.ok()) {
    s = backup_meta_file->Close();
  }
  if (s.ok()) {
    s = env_->RenameFile(meta_filename_ + ".tmp", meta_filename_);
  }
  return s;
}

// --- BackupableDB methods --------

970
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
I
Igor Canadi 已提交
971 972
    : StackableDB(db),
      backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {
I
Igor Canadi 已提交
973 974 975
  if (options.share_table_files) {
    backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber());
  }
976
}
I
Igor Canadi 已提交
977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997

BackupableDB::~BackupableDB() {
  delete backup_engine_;
}

Status BackupableDB::CreateNewBackup(bool flush_before_backup) {
  return backup_engine_->CreateNewBackup(this, flush_before_backup);
}

void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  backup_engine_->GetBackupInfo(backup_info);
}

Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status BackupableDB::DeleteBackup(BackupID backup_id) {
  return backup_engine_->DeleteBackup(backup_id);
}

I
Igor Canadi 已提交
998 999 1000 1001
void BackupableDB::StopBackup() {
  backup_engine_->StopBackup();
}

I
Igor Canadi 已提交
1002 1003 1004 1005
// --- RestoreBackupableDB methods ------

RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
                                         const BackupableDBOptions& options)
I
Igor Canadi 已提交
1006
    : backup_engine_(new BackupEngineImpl(db_env, options)) {}
I
Igor Canadi 已提交
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037

RestoreBackupableDB::~RestoreBackupableDB() {
  delete backup_engine_;
}

void
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  backup_engine_->GetBackupInfo(backup_info);
}

Status RestoreBackupableDB::RestoreDBFromBackup(BackupID backup_id,
                                                const std::string& db_dir,
                                                const std::string& wal_dir) {
  return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir);
}

Status
RestoreBackupableDB::RestoreDBFromLatestBackup(const std::string& db_dir,
                                               const std::string& wal_dir) {
  return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir);
}

Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) {
  return backup_engine_->DeleteBackup(backup_id);
}

}  // namespace rocksdb