backupable_db.cc 49.8 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10 11
#ifndef ROCKSDB_LITE

12
#include "rocksdb/utilities/backupable_db.h"
I
Igor Canadi 已提交
13 14
#include "db/filename.h"
#include "util/coding.h"
L
Lei Jin 已提交
15
#include "util/crc32c.h"
I
Igor Canadi 已提交
16
#include "util/logging.h"
S
sdong 已提交
17
#include "util/string_util.h"
I
Igor Canadi 已提交
18 19
#include "rocksdb/transaction_log.h"

L
liuhuahang 已提交
20
#ifndef __STDC_FORMAT_MACROS
I
Igor Canadi 已提交
21
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
22
#endif
I
Igor Canadi 已提交
23 24

#include <inttypes.h>
25
#include <stdlib.h>
I
Igor Canadi 已提交
26 27 28
#include <algorithm>
#include <vector>
#include <map>
I
Igor Canadi 已提交
29
#include <sstream>
I
Igor Canadi 已提交
30 31
#include <string>
#include <limits>
I
Igor Canadi 已提交
32
#include <atomic>
33
#include <unordered_map>
I
Igor Canadi 已提交
34 35 36

namespace rocksdb {

I
Igor Canadi 已提交
37
namespace {
L
Lei Jin 已提交
38
class BackupRateLimiter {
I
Igor Canadi 已提交
39
 public:
L
Lei Jin 已提交
40 41
  BackupRateLimiter(Env* env, uint64_t max_bytes_per_second,
                   uint64_t bytes_per_check)
I
Igor Canadi 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
      : env_(env),
        max_bytes_per_second_(max_bytes_per_second),
        bytes_per_check_(bytes_per_check),
        micros_start_time_(env->NowMicros()),
        bytes_since_start_(0) {}

  void ReportAndWait(uint64_t bytes_since_last_call) {
    bytes_since_start_ += bytes_since_last_call;
    if (bytes_since_start_ < bytes_per_check_) {
      // not enough bytes to be rate-limited
      return;
    }

    uint64_t now = env_->NowMicros();
    uint64_t interval = now - micros_start_time_;
    uint64_t should_take_micros =
        (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_;

    if (should_take_micros > interval) {
61 62
      env_->SleepForMicroseconds(
          static_cast<int>(should_take_micros - interval));
I
Igor Canadi 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
      now = env_->NowMicros();
    }
    // reset interval
    micros_start_time_ = now;
    bytes_since_start_ = 0;
  }

 private:
  Env* env_;
  uint64_t max_bytes_per_second_;
  uint64_t bytes_per_check_;
  uint64_t micros_start_time_;
  uint64_t bytes_since_start_;
  static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};
}  // namespace

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
void BackupStatistics::IncrementNumberSuccessBackup() {
  number_success_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() {
  number_fail_backup++;
}

uint32_t BackupStatistics::GetNumberSuccessBackup() const {
  return number_success_backup;
}
uint32_t BackupStatistics::GetNumberFailBackup() const {
  return number_fail_backup;
}

std::string BackupStatistics::ToString() const {
  char result[50];
  snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
           GetNumberSuccessBackup(), GetNumberFailBackup());
  return result;
}

I
Igor Canadi 已提交
101
void BackupableDBOptions::Dump(Logger* logger) const {
I
Igor Canadi 已提交
102 103 104
  Log(logger, "        Options.backup_dir: %s", backup_dir.c_str());
  Log(logger, "        Options.backup_env: %p", backup_env);
  Log(logger, " Options.share_table_files: %d",
I
Igor Canadi 已提交
105
      static_cast<int>(share_table_files));
I
Igor Canadi 已提交
106 107 108
  Log(logger, "          Options.info_log: %p", info_log);
  Log(logger, "              Options.sync: %d", static_cast<int>(sync));
  Log(logger, "  Options.destroy_old_data: %d",
I
Igor Canadi 已提交
109
      static_cast<int>(destroy_old_data));
I
Igor Canadi 已提交
110
  Log(logger, "  Options.backup_log_files: %d",
111
      static_cast<int>(backup_log_files));
I
Igor Canadi 已提交
112 113
  Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit);
  Log(logger, "Options.restore_rate_limit: %" PRIu64, restore_rate_limit);
I
Igor Canadi 已提交
114 115
}

I
Igor Canadi 已提交
116 117
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
I
Igor Canadi 已提交
118
 public:
I
Igor Canadi 已提交
119 120
  BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
                   bool read_only = false);
I
Igor Canadi 已提交
121
  ~BackupEngineImpl();
I
Igor Sugak 已提交
122 123 124 125
  Status CreateNewBackup(DB* db, bool flush_before_backup = false) override;
  Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
  Status DeleteBackup(BackupID backup_id) override;
  void StopBackup() override {
I
Igor Canadi 已提交
126 127
    stop_backup_.store(true, std::memory_order_release);
  }
I
Igor Sugak 已提交
128 129 130 131 132 133 134 135 136 137
  Status GarbageCollect() override;

  void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
  void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
  Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override;
  Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override {
138 139
    return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir,
                               restore_options);
I
Igor Canadi 已提交
140 141 142
  }

 private:
143 144
  void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);

L
Lei Jin 已提交
145 146 147 148
  struct FileInfo {
    FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
      : refs(0), filename(fname), size(sz), checksum_value(checksum) {}

149 150 151
    FileInfo(const FileInfo&) = delete;
    FileInfo& operator=(const FileInfo&) = delete;

L
Lei Jin 已提交
152 153 154
    int refs;
    const std::string filename;
    const uint64_t size;
155
    const uint32_t checksum_value;
L
Lei Jin 已提交
156 157
  };

I
Igor Canadi 已提交
158 159 160
  class BackupMeta {
   public:
    BackupMeta(const std::string& meta_filename,
161 162
        std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
        Env* env)
I
Igor Canadi 已提交
163
      : timestamp_(0), size_(0), meta_filename_(meta_filename),
L
Lei Jin 已提交
164
        file_infos_(file_infos), env_(env) {}
I
Igor Canadi 已提交
165

166 167 168
    BackupMeta(const BackupMeta&) = delete;
    BackupMeta& operator=(const BackupMeta&) = delete;

I
Igor Canadi 已提交
169 170 171 172 173 174 175 176 177 178 179
    ~BackupMeta() {}

    void RecordTimestamp() {
      env_->GetCurrentTime(&timestamp_);
    }
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
180
    uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
181 182 183 184 185 186
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
I
Igor Canadi 已提交
187

188
    Status AddFile(std::shared_ptr<FileInfo> file_info);
L
Lei Jin 已提交
189

I
Igor Canadi 已提交
190
    void Delete(bool delete_meta = true);
I
Igor Canadi 已提交
191 192 193 194 195

    bool Empty() {
      return files_.empty();
    }

196
    std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
197 198 199
      auto it = file_infos_->find(filename);
      if (it == file_infos_->end())
        return nullptr;
200
      return it->second;
201 202
    }

203
    const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
I
Igor Canadi 已提交
204 205 206 207 208 209
      return files_;
    }

    Status LoadFromFile(const std::string& backup_dir);
    Status StoreToFile(bool sync);

I
Igor Canadi 已提交
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    std::string GetInfoString() {
      std::ostringstream ss;
      ss << "Timestamp: " << timestamp_ << std::endl;
      char human_size[16];
      AppendHumanBytes(size_, human_size, sizeof(human_size));
      ss << "Size: " << human_size << std::endl;
      ss << "Files:" << std::endl;
      for (const auto& file : files_) {
        AppendHumanBytes(file->size, human_size, sizeof(human_size));
        ss << file->filename << ", size " << human_size << ", refs "
           << file->refs << std::endl;
      }
      return ss.str();
    }

I
Igor Canadi 已提交
225 226
   private:
    int64_t timestamp_;
227 228 229
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
I
Igor Canadi 已提交
230 231 232
    uint64_t size_;
    std::string const meta_filename_;
    // files with relative paths (without "/" prefix!!)
233 234
    std::vector<std::shared_ptr<FileInfo>> files_;
    std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
I
Igor Canadi 已提交
235
    Env* env_;
236

237 238
    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
  };  // BackupMeta
I
Igor Canadi 已提交
239 240 241 242 243 244 245 246 247

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
248 249 250
  inline std::string GetSharedChecksumDirRel() const {
    return "shared_checksum";
  }
I
Igor Canadi 已提交
251
  inline std::string GetPrivateFileRel(BackupID backup_id,
I
Igor Canadi 已提交
252 253
                                       bool tmp = false,
                                       const std::string& file = "") const {
I
Igor Canadi 已提交
254
    assert(file.size() == 0 || file[0] != '/');
S
sdong 已提交
255
    return GetPrivateDirRel() + "/" + rocksdb::ToString(backup_id) +
I
Igor Canadi 已提交
256
           (tmp ? ".tmp" : "") + "/" + file;
I
Igor Canadi 已提交
257
  }
I
Igor Canadi 已提交
258 259
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
I
Igor Canadi 已提交
260
    assert(file.size() == 0 || file[0] != '/');
I
Igor Canadi 已提交
261
    return "shared/" + file + (tmp ? ".tmp" : "");
I
Igor Canadi 已提交
262
  }
263 264 265 266 267 268 269 270 271 272 273
  inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
                                                  bool tmp = false) const {
    assert(file.size() == 0 || file[0] != '/');
    return GetSharedChecksumDirRel() + "/" + file + (tmp ? ".tmp" : "");
  }
  inline std::string GetSharedFileWithChecksum(const std::string& file,
                                               const uint32_t checksum_value,
                                               const uint64_t file_size) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    return file_copy.insert(file_copy.find_last_of('.'),
S
sdong 已提交
274 275
                            "_" + rocksdb::ToString(checksum_value) + "_" +
                                rocksdb::ToString(file_size));
276 277 278 279 280 281 282 283
  }
  inline std::string GetFileFromChecksumFile(const std::string& file) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    size_t first_underscore = file_copy.find_first_of('_');
    return file_copy.erase(first_underscore,
                           file_copy.find_last_of('.') - first_underscore);
  }
I
Igor Canadi 已提交
284 285 286 287 288 289 290
  inline std::string GetLatestBackupFile(bool tmp = false) const {
    return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : ""));
  }
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
  inline std::string GetBackupMetaFile(BackupID backup_id) const {
S
sdong 已提交
291
    return GetBackupMetaDir() + "/" + rocksdb::ToString(backup_id);
I
Igor Canadi 已提交
292 293 294 295 296 297 298 299 300 301
  }

  Status GetLatestBackupFileContents(uint32_t* latest_backup);
  Status PutLatestBackupFileContents(uint32_t latest_backup);
  // if size_limit == 0, there is no size limit, copy everything
  Status CopyFile(const std::string& src,
                  const std::string& dst,
                  Env* src_env,
                  Env* dst_env,
                  bool sync,
L
Lei Jin 已提交
302
                  BackupRateLimiter* rate_limiter,
I
Igor Canadi 已提交
303
                  uint64_t* size = nullptr,
L
Lei Jin 已提交
304
                  uint32_t* checksum_value = nullptr,
I
Igor Canadi 已提交
305 306 307 308 309 310
                  uint64_t size_limit = 0);
  // if size_limit == 0, there is no size limit, copy everything
  Status BackupFile(BackupID backup_id,
                    BackupMeta* backup,
                    bool shared,
                    const std::string& src_dir,
311
                    const std::string& src_fname,  // starts with "/"
L
Lei Jin 已提交
312
                    BackupRateLimiter* rate_limiter,
313 314
                    uint64_t size_limit = 0,
                    bool shared_checksum = false);
L
Lei Jin 已提交
315 316 317 318 319 320

  Status CalculateChecksum(const std::string& src,
                           Env* src_env,
                           uint64_t size_limit,
                           uint32_t* checksum_value);

I
Igor Canadi 已提交
321 322
  // backup state data
  BackupID latest_backup_id_;
323 324 325 326 327
  std::map<BackupID, unique_ptr<BackupMeta>> backups_;
  std::map<BackupID,
           std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
  std::unordered_map<std::string,
                     std::shared_ptr<FileInfo>> backuped_file_infos_;
I
Igor Canadi 已提交
328
  std::atomic<bool> stop_backup_;
I
Igor Canadi 已提交
329 330 331 332 333 334

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

I
Igor Canadi 已提交
335 336 337 338 339 340
  // directories
  unique_ptr<Directory> backup_directory_;
  unique_ptr<Directory> shared_directory_;
  unique_ptr<Directory> meta_directory_;
  unique_ptr<Directory> private_directory_;

I
Igor Canadi 已提交
341 342
  static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
  size_t copy_file_buffer_size_;
I
Igor Canadi 已提交
343
  bool read_only_;
344
  BackupStatistics backup_statistics_;
I
Igor Canadi 已提交
345 346
};

347 348
BackupEngine* BackupEngine::NewBackupEngine(
    Env* db_env, const BackupableDBOptions& options) {
I
Igor Canadi 已提交
349 350 351
  return new BackupEngineImpl(db_env, options);
}

H
Hasnain Lakhani 已提交
352 353 354 355 356 357 358
Status BackupEngine::Open(Env* env,
                          const BackupableDBOptions& options,
                          BackupEngine** backup_engine_ptr) {
  *backup_engine_ptr = new BackupEngineImpl(env, options);
  return Status::OK();
}

I
Igor Canadi 已提交
359
BackupEngineImpl::BackupEngineImpl(Env* db_env,
I
Igor Canadi 已提交
360 361
                                   const BackupableDBOptions& options,
                                   bool read_only)
I
Igor Canadi 已提交
362 363 364
    : stop_backup_(false),
      options_(options),
      db_env_(db_env),
I
Igor Canadi 已提交
365
      backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
I
Igor Canadi 已提交
366 367 368 369 370
      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
      read_only_(read_only) {
  if (read_only_) {
    Log(options_.info_log, "Starting read_only backup engine");
  }
I
Igor Canadi 已提交
371 372
  options_.Dump(options_.info_log);

I
Igor Canadi 已提交
373 374 375 376 377
  if (!read_only_) {
    // create all the dirs we need
    backup_env_->CreateDirIfMissing(GetAbsolutePath());
    backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_);
    if (options_.share_table_files) {
378 379 380 381 382 383 384 385 386 387
      if (options_.share_files_with_checksum) {
        backup_env_->CreateDirIfMissing(GetAbsolutePath(
            GetSharedFileWithChecksumRel()));
        backup_env_->NewDirectory(GetAbsolutePath(
            GetSharedFileWithChecksumRel()), &shared_directory_);
      } else {
        backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
        backup_env_->NewDirectory(GetAbsolutePath(GetSharedFileRel()),
                                  &shared_directory_);
      }
I
Igor Canadi 已提交
388 389 390 391 392 393
    }
    backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel()));
    backup_env_->NewDirectory(GetAbsolutePath(GetPrivateDirRel()),
                              &private_directory_);
    backup_env_->CreateDirIfMissing(GetBackupMetaDir());
    backup_env_->NewDirectory(GetBackupMetaDir(), &meta_directory_);
I
Igor Canadi 已提交
394
  }
I
Igor Canadi 已提交
395 396 397 398 399

  std::vector<std::string> backup_meta_files;
  backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
  // create backups_ structure
  for (auto& file : backup_meta_files) {
I
Igor Canadi 已提交
400 401 402 403
    if (file == "." || file == "..") {
      continue;
    }
    Log(options_.info_log, "Detected backup %s", file.c_str());
I
Igor Canadi 已提交
404 405
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
S
sdong 已提交
406
    if (backup_id == 0 || file != rocksdb::ToString(backup_id)) {
I
Igor Canadi 已提交
407
      if (!read_only_) {
I
Igor Canadi 已提交
408 409
        Log(options_.info_log, "Unrecognized meta file %s, deleting",
            file.c_str());
I
Igor Canadi 已提交
410 411 412
        // invalid file name, delete that
        backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
      }
I
Igor Canadi 已提交
413 414 415
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
416 417 418 419
    backups_.insert(std::move(
        std::make_pair(backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                                      GetBackupMetaFile(backup_id),
                                      &backuped_file_infos_, backup_env_)))));
I
Igor Canadi 已提交
420 421
  }

C
clark.kang 已提交
422
  if (options_.destroy_old_data) {  // Destroy old data
I
Igor Canadi 已提交
423
    assert(!read_only_);
I
Igor Canadi 已提交
424 425 426
    Log(options_.info_log,
        "Backup Engine started with destroy_old_data == true, deleting all "
        "backups");
H
Hasnain Lakhani 已提交
427 428
    PurgeOldBackups(0);
    (void) GarbageCollect();
I
Igor Canadi 已提交
429 430
    // start from beginning
    latest_backup_id_ = 0;
431
  } else {  // Load data from storage
I
Igor Canadi 已提交
432 433
    // load the backups if any
    for (auto& backup : backups_) {
434
      Status s = backup.second->LoadFromFile(options_.backup_dir);
I
Igor Canadi 已提交
435
      if (!s.ok()) {
I
Igor Canadi 已提交
436 437
        Log(options_.info_log, "Backup %u corrupted -- %s", backup.first,
            s.ToString().c_str());
H
Hasnain Lakhani 已提交
438
        corrupt_backups_.insert(std::make_pair(
439
              backup.first, std::make_pair(s, std::move(backup.second))));
I
Igor Canadi 已提交
440 441 442
      } else {
        Log(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
            backup.first, backup.second->GetInfoString().c_str());
I
Igor Canadi 已提交
443 444
      }
    }
H
Hasnain Lakhani 已提交
445

446
    for (const auto& corrupt : corrupt_backups_) {
H
Hasnain Lakhani 已提交
447
      backups_.erase(backups_.find(corrupt.first));
I
Igor Canadi 已提交
448 449 450
    }

    Status s = GetLatestBackupFileContents(&latest_backup_id_);
I
Igor Canadi 已提交
451

I
Igor Canadi 已提交
452 453 454 455 456 457 458 459 460 461
    // If latest backup file is corrupted or non-existent
    // set latest backup as the biggest backup we have
    // or 0 if we have no backups
    if (!s.ok() ||
        backups_.find(latest_backup_id_) == backups_.end()) {
      auto itr = backups_.end();
      latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
    }
  }

I
Igor Canadi 已提交
462 463
  Log(options_.info_log, "Latest backup is %u", latest_backup_id_);

I
Igor Canadi 已提交
464
  // delete any backups that claim to be later than latest
H
Hasnain Lakhani 已提交
465 466 467
  std::vector<BackupID> later_ids;
  for (auto itr = backups_.lower_bound(latest_backup_id_ + 1);
       itr != backups_.end(); itr++) {
I
Igor Canadi 已提交
468 469
    Log(options_.info_log,
        "Found backup claiming to be later than latest: %" PRIu32, itr->first);
H
Hasnain Lakhani 已提交
470 471 472
    later_ids.push_back(itr->first);
  }
  for (auto id : later_ids) {
I
Igor Canadi 已提交
473 474 475 476 477 478 479 480 481
    if (!read_only_) {
      DeleteBackup(id);
    } else {
      auto backup = backups_.find(id);
      // We just found it couple of lines earlier!
      assert(backup != backups_.end());
      backup->second->Delete(false);
      backups_.erase(backup);
    }
I
Igor Canadi 已提交
482 483
  }

I
Igor Canadi 已提交
484 485 486
  if (!read_only_) {
    PutLatestBackupFileContents(latest_backup_id_);  // Ignore errors
  }
I
Igor Canadi 已提交
487
  Log(options_.info_log, "Initialized BackupEngine");
I
Igor Canadi 已提交
488 489
}

I
Igor Canadi 已提交
490
BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); }
I
Igor Canadi 已提交
491

I
Igor Canadi 已提交
492
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
I
Igor Canadi 已提交
493
  assert(!read_only_);
I
Igor Canadi 已提交
494 495 496 497
  Status s;
  std::vector<std::string> live_files;
  VectorLogPtr live_wal_files;
  uint64_t manifest_file_size = 0;
498
  uint64_t sequence_number = db->GetLatestSequenceNumber();
I
Igor Canadi 已提交
499 500 501 502 503 504 505

  s = db->DisableFileDeletions();
  if (s.ok()) {
    // this will return live_files prefixed with "/"
    s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
  }
  // if we didn't flush before backup, we need to also get WAL files
506
  if (s.ok() && !flush_before_backup && options_.backup_log_files) {
I
Igor Canadi 已提交
507 508 509 510
    // returns file names prefixed with "/"
    s = db->GetSortedWalFiles(live_wal_files);
  }
  if (!s.ok()) {
511
    db->EnableFileDeletions(false);
I
Igor Canadi 已提交
512 513 514 515 516
    return s;
  }

  BackupID new_backup_id = latest_backup_id_ + 1;
  assert(backups_.find(new_backup_id) == backups_.end());
517 518 519 520
  auto ret = backups_.insert(std::move(
      std::make_pair(new_backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                                        GetBackupMetaFile(new_backup_id),
                                        &backuped_file_infos_, backup_env_)))));
I
Igor Canadi 已提交
521 522
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
523 524
  new_backup->RecordTimestamp();
  new_backup->SetSequenceNumber(sequence_number);
I
Igor Canadi 已提交
525

526 527
  auto start_backup = backup_env_-> NowMicros();

I
Igor Canadi 已提交
528 529 530
  Log(options_.info_log, "Started the backup process -- creating backup %u",
      new_backup_id);

I
Igor Canadi 已提交
531 532 533
  // create temporary private dir
  s = backup_env_->CreateDir(
      GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
I
Igor Canadi 已提交
534

L
Lei Jin 已提交
535
  unique_ptr<BackupRateLimiter> rate_limiter;
I
Igor Canadi 已提交
536 537
  if (options_.backup_rate_limit > 0) {
    copy_file_buffer_size_ = options_.backup_rate_limit / 10;
L
Lei Jin 已提交
538 539
    rate_limiter.reset(new BackupRateLimiter(db_env_,
          options_.backup_rate_limit, copy_file_buffer_size_));
I
Igor Canadi 已提交
540 541
  }

I
Igor Canadi 已提交
542 543 544 545 546
  // copy live_files
  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(live_files[i], &number, &type);
I
Igor Canadi 已提交
547 548 549 550
    if (!ok) {
      assert(false);
      return Status::Corruption("Can't parse file name. This is very bad");
    }
I
Igor Canadi 已提交
551
    // we should only get sst, manifest and current files here
I
Igor Canadi 已提交
552 553
    assert(type == kTableFile || type == kDescriptorFile ||
           type == kCurrentFile);
I
Igor Canadi 已提交
554 555

    // rules:
556
    // * if it's kTableFile, then it's shared
I
Igor Canadi 已提交
557 558
    // * if it's kDescriptorFile, limit the size to manifest_file_size
    s = BackupFile(new_backup_id,
559
                   new_backup.get(),
I
Igor Canadi 已提交
560
                   options_.share_table_files && type == kTableFile,
I
Igor Canadi 已提交
561 562
                   db->GetName(),            /* src_dir */
                   live_files[i],            /* src_fname */
I
Igor Canadi 已提交
563
                   rate_limiter.get(),
564 565
                   (type == kDescriptorFile) ? manifest_file_size : 0,
                   options_.share_files_with_checksum && type == kTableFile);
I
Igor Canadi 已提交
566 567 568 569 570 571 572 573
  }

  // copy WAL files
  for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
    if (live_wal_files[i]->Type() == kAliveLogFile) {
      // we only care about live log files
      // copy the file into backup_dir/files/<new backup>/
      s = BackupFile(new_backup_id,
574
                     new_backup.get(),
I
Igor Canadi 已提交
575 576
                     false, /* not shared */
                     db->GetOptions().wal_dir,
I
Igor Canadi 已提交
577 578
                     live_wal_files[i]->PathName(),
                     rate_limiter.get());
I
Igor Canadi 已提交
579 580 581 582
    }
  }

  // we copied all the files, enable file deletions
583
  db->EnableFileDeletions(false);
I
Igor Canadi 已提交
584

I
Igor Canadi 已提交
585 586
  if (s.ok()) {
    // move tmp private backup to real backup folder
I
Igor Canadi 已提交
587 588 589 590
    Log(options_.info_log,
        "Moving tmp backup directory to the real one: %s -> %s\n",
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)).c_str(),
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)).c_str());
I
Igor Canadi 已提交
591
    s = backup_env_->RenameFile(
592
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)),  // tmp
I
Igor Canadi 已提交
593 594 595
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)));
  }

596 597
  auto backup_time = backup_env_->NowMicros() - start_backup;

I
Igor Canadi 已提交
598 599
  if (s.ok()) {
    // persist the backup metadata on the disk
600
    s = new_backup->StoreToFile(options_.sync);
I
Igor Canadi 已提交
601 602 603 604 605
  }
  if (s.ok()) {
    // install the newly created backup meta! (atomic)
    s = PutLatestBackupFileContents(new_backup_id);
  }
I
Igor Canadi 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
  if (s.ok() && options_.sync) {
    unique_ptr<Directory> backup_private_directory;
    backup_env_->NewDirectory(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
        &backup_private_directory);
    if (backup_private_directory != nullptr) {
      backup_private_directory->Fsync();
    }
    if (private_directory_ != nullptr) {
      private_directory_->Fsync();
    }
    if (meta_directory_ != nullptr) {
      meta_directory_->Fsync();
    }
    if (shared_directory_ != nullptr) {
      shared_directory_->Fsync();
    }
    if (backup_directory_ != nullptr) {
      backup_directory_->Fsync();
    }
  }

628 629 630
  if (s.ok()) {
    backup_statistics_.IncrementNumberSuccessBackup();
  }
I
Igor Canadi 已提交
631
  if (!s.ok()) {
632
    backup_statistics_.IncrementNumberFailBackup();
I
Igor Canadi 已提交
633 634
    // clean all the files we might have created
    Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str());
635 636
    Log(options_.info_log, "Backup Statistics %s\n",
        backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
637 638 639
    // delete files that we might have already written
    DeleteBackup(new_backup_id);
    GarbageCollect();
I
Igor Canadi 已提交
640 641 642 643 644 645 646
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
  Log(options_.info_log, "Backup DONE. All is good");
647 648

  // backup_speed is in byte/second
649
  double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
650
  Log(options_.info_log, "Backup number of files: %u",
651
      new_backup->GetNumberFiles());
I
Igor Canadi 已提交
652 653 654
  char human_size[16];
  AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
  Log(options_.info_log, "Backup size: %s", human_size);
I
Igor Canadi 已提交
655
  Log(options_.info_log, "Backup time: %" PRIu64 " microseconds", backup_time);
656 657 658
  Log(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
  Log(options_.info_log, "Backup Statistics %s",
      backup_statistics_.ToString().c_str());
I
Igor Canadi 已提交
659 660 661
  return s;
}

I
Igor Canadi 已提交
662
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
I
Igor Canadi 已提交
663
  assert(!read_only_);
I
Igor Canadi 已提交
664 665
  Log(options_.info_log, "Purging old backups, keeping %u",
      num_backups_to_keep);
H
Hasnain Lakhani 已提交
666 667 668 669 670 671 672 673
  std::vector<BackupID> to_delete;
  auto itr = backups_.begin();
  while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
    to_delete.push_back(itr->first);
    itr++;
  }
  for (auto backup_id : to_delete) {
    DeleteBackup(backup_id);
I
Igor Canadi 已提交
674 675 676 677
  }
  return Status::OK();
}

I
Igor Canadi 已提交
678
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
I
Igor Canadi 已提交
679
  assert(!read_only_);
I
Igor Canadi 已提交
680 681
  Log(options_.info_log, "Deleting backup %u", backup_id);
  auto backup = backups_.find(backup_id);
H
Hasnain Lakhani 已提交
682
  if (backup != backups_.end()) {
683
    backup->second->Delete();
H
Hasnain Lakhani 已提交
684 685 686 687 688 689
    backups_.erase(backup);
  } else {
    auto corrupt = corrupt_backups_.find(backup_id);
    if (corrupt == corrupt_backups_.end()) {
      return Status::NotFound("Backup not found");
    }
690
    corrupt->second.second->Delete();
H
Hasnain Lakhani 已提交
691 692 693 694 695
    corrupt_backups_.erase(corrupt);
  }

  std::vector<std::string> to_delete;
  for (auto& itr : backuped_file_infos_) {
696
    if (itr.second->refs == 0) {
H
Hasnain Lakhani 已提交
697 698 699 700 701 702 703 704
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
      Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
          s.ToString().c_str());
      to_delete.push_back(itr.first);
    }
  }
  for (auto& td : to_delete) {
    backuped_file_infos_.erase(td);
I
Igor Canadi 已提交
705
  }
H
Hasnain Lakhani 已提交
706 707 708 709 710 711 712

  // take care of private dirs -- GarbageCollect() will take care of them
  // if they are not empty
  std::string private_dir = GetPrivateFileRel(backup_id);
  Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
  Log(options_.info_log, "Deleting private dir %s -- %s",
      private_dir.c_str(), s.ToString().c_str());
I
Igor Canadi 已提交
713 714 715
  return Status::OK();
}

I
Igor Canadi 已提交
716
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
I
Igor Canadi 已提交
717 718
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
719
    if (!backup.second->Empty()) {
720
        backup_info->push_back(BackupInfo(
721 722 723
            backup.first, backup.second->GetTimestamp(),
            backup.second->GetSize(),
            backup.second->GetNumberFiles()));
I
Igor Canadi 已提交
724 725 726 727
    }
  }
}

H
Hasnain Lakhani 已提交
728 729 730 731 732 733 734 735 736
void
BackupEngineImpl::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
  corrupt_backup_ids->reserve(corrupt_backups_.size());
  for (auto& backup : corrupt_backups_) {
    corrupt_backup_ids->push_back(backup.first);
  }
}

737 738 739
Status BackupEngineImpl::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
H
Hasnain Lakhani 已提交
740 741 742 743
  auto corrupt_itr = corrupt_backups_.find(backup_id);
  if (corrupt_itr != corrupt_backups_.end()) {
    return corrupt_itr->second.first;
  }
I
Igor Canadi 已提交
744 745 746 747 748
  auto backup_itr = backups_.find(backup_id);
  if (backup_itr == backups_.end()) {
    return Status::NotFound("Backup not found");
  }
  auto& backup = backup_itr->second;
749
  if (backup->Empty()) {
I
Igor Canadi 已提交
750 751 752 753
    return Status::NotFound("Backup not found");
  }

  Log(options_.info_log, "Restoring backup id %u\n", backup_id);
754 755
  Log(options_.info_log, "keep_log_files: %d\n",
      static_cast<int>(restore_options.keep_log_files));
I
Igor Canadi 已提交
756 757 758 759 760

  // just in case. Ignore errors
  db_env_->CreateDirIfMissing(db_dir);
  db_env_->CreateDirIfMissing(wal_dir);

761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
  if (restore_options.keep_log_files) {
    // delete files in db_dir, but keep all the log files
    DeleteChildren(db_dir, 1 << kLogFile);
    // move all the files from archive dir to wal_dir
    std::string archive_dir = ArchivalDirectory(wal_dir);
    std::vector<std::string> archive_files;
    db_env_->GetChildren(archive_dir, &archive_files);  // ignore errors
    for (const auto& f : archive_files) {
      uint64_t number;
      FileType type;
      bool ok = ParseFileName(f, &number, &type);
      if (ok && type == kLogFile) {
        Log(options_.info_log, "Moving log file from archive/ to wal_dir: %s",
            f.c_str());
        Status s =
            db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
        if (!s.ok()) {
          // if we can't move log file from archive_dir to wal_dir,
          // we should fail, since it might mean data loss
          return s;
        }
      }
    }
  } else {
    DeleteChildren(wal_dir);
    DeleteChildren(ArchivalDirectory(wal_dir));
    DeleteChildren(db_dir);
788
  }
I
Igor Canadi 已提交
789

L
Lei Jin 已提交
790
  unique_ptr<BackupRateLimiter> rate_limiter;
I
Igor Canadi 已提交
791 792
  if (options_.restore_rate_limit > 0) {
    copy_file_buffer_size_ = options_.restore_rate_limit / 10;
L
Lei Jin 已提交
793 794
    rate_limiter.reset(new BackupRateLimiter(db_env_,
          options_.restore_rate_limit, copy_file_buffer_size_));
I
Igor Canadi 已提交
795
  }
I
Igor Canadi 已提交
796
  Status s;
797 798
  for (const auto& file_info : backup->GetFiles()) {
    const std::string &file = file_info->filename;
I
Igor Canadi 已提交
799 800 801
    std::string dst;
    // 1. extract the filename
    size_t slash = file.find_last_of('/');
802 803
    // file will either be shared/<file>, shared_checksum/<file_crc32_size>
    // or private/<number>/<file>
I
Igor Canadi 已提交
804 805 806
    assert(slash != std::string::npos);
    dst = file.substr(slash + 1);

807 808 809 810 811 812
    // if the file was in shared_checksum, extract the real file name
    // in this case the file is <number>_<checksum>_<size>.<type>
    if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
      dst = GetFileFromChecksumFile(dst);
    }

I
Igor Canadi 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825
    // 2. find the filetype
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(dst, &number, &type);
    if (!ok) {
      return Status::Corruption("Backup corrupted");
    }
    // 3. Construct the final path
    // kLogFile lives in wal_dir and all the rest live in db_dir
    dst = ((type == kLogFile) ? wal_dir : db_dir) +
      "/" + dst;

    Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
L
Lei Jin 已提交
826 827
    uint32_t checksum_value;
    s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
I
Igor Canadi 已提交
828
                 rate_limiter.get(), nullptr /* size */, &checksum_value);
I
Igor Canadi 已提交
829 830 831
    if (!s.ok()) {
      break;
    }
L
Lei Jin 已提交
832

833
    if (file_info->checksum_value != checksum_value) {
L
Lei Jin 已提交
834 835 836
      s = Status::Corruption("Checksum check failed");
      break;
    }
I
Igor Canadi 已提交
837 838 839 840 841 842 843
  }

  Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
  return s;
}

// latest backup id is an ASCII representation of latest backup id
I
Igor Canadi 已提交
844
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
I
Igor Canadi 已提交
845 846 847 848 849 850 851 852 853
  Status s;
  unique_ptr<SequentialFile> file;
  s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
                                     &file,
                                     EnvOptions());
  if (!s.ok()) {
    return s;
  }

854 855
  char buf[11];
  Slice data;
I
Igor Canadi 已提交
856 857 858 859
  s = file->Read(10, &data, buf);
  if (!s.ok() || data.size() == 0) {
    return s.ok() ? Status::Corruption("Latest backup file corrupted") : s;
  }
860
  buf[data.size()] = 0;
I
Igor Canadi 已提交
861

862
  *latest_backup = 0;
I
Igor Canadi 已提交
863 864 865 866 867 868 869 870 871 872 873
  sscanf(data.data(), "%u", latest_backup);
  if (backup_env_->FileExists(GetBackupMetaFile(*latest_backup)) == false) {
    s = Status::Corruption("Latest backup file corrupted");
  }
  return Status::OK();
}

// this operation HAS to be atomic
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
I
Igor Canadi 已提交
874
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
I
Igor Canadi 已提交
875
  assert(!read_only_);
I
Igor Canadi 已提交
876 877 878 879 880 881 882 883 884 885 886 887
  Status s;
  unique_ptr<WritableFile> file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = backup_env_->NewWritableFile(GetLatestBackupFile(true),
                                   &file,
                                   env_options);
  if (!s.ok()) {
    backup_env_->DeleteFile(GetLatestBackupFile(true));
    return s;
  }

888
  char file_contents[10];
I
Igor Canadi 已提交
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
  int len = sprintf(file_contents, "%u\n", latest_backup);
  s = file->Append(Slice(file_contents, len));
  if (s.ok() && options_.sync) {
    file->Sync();
  }
  if (s.ok()) {
    s = file->Close();
  }
  if (s.ok()) {
    // atomically replace real file with new tmp
    s = backup_env_->RenameFile(GetLatestBackupFile(true),
                                GetLatestBackupFile(false));
  }
  return s;
}

L
Lei Jin 已提交
905 906 907 908 909 910 911
Status BackupEngineImpl::CopyFile(
    const std::string& src,
    const std::string& dst, Env* src_env,
    Env* dst_env, bool sync,
    BackupRateLimiter* rate_limiter, uint64_t* size,
    uint32_t* checksum_value,
    uint64_t size_limit) {
I
Igor Canadi 已提交
912 913 914 915 916
  Status s;
  unique_ptr<WritableFile> dst_file;
  unique_ptr<SequentialFile> src_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
917
  env_options.use_os_buffer = false;
I
Igor Canadi 已提交
918 919 920
  if (size != nullptr) {
    *size = 0;
  }
L
Lei Jin 已提交
921 922 923
  if (checksum_value != nullptr) {
    *checksum_value = 0;
  }
I
Igor Canadi 已提交
924 925 926 927 928 929 930 931 932 933 934 935 936 937

  // Check if size limit is set. if not, set it to very big number
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (s.ok()) {
    s = dst_env->NewWritableFile(dst, &dst_file, env_options);
  }
  if (!s.ok()) {
    return s;
  }

938 939
  unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;
I
Igor Canadi 已提交
940 941

  do {
I
Igor Canadi 已提交
942 943 944
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
I
Igor Canadi 已提交
945 946
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
947
    s = src_file->Read(buffer_to_read, &data, buf.get());
I
Igor Canadi 已提交
948
    size_limit -= data.size();
L
Lei Jin 已提交
949 950 951 952 953

    if (!s.ok()) {
      return s;
    }

I
Igor Canadi 已提交
954 955 956
    if (size != nullptr) {
      *size += data.size();
    }
L
Lei Jin 已提交
957 958 959
    if (checksum_value != nullptr) {
      *checksum_value = crc32c::Extend(*checksum_value, data.data(),
                                       data.size());
I
Igor Canadi 已提交
960
    }
L
Lei Jin 已提交
961
    s = dst_file->Append(data);
I
Igor Canadi 已提交
962 963 964
    if (rate_limiter != nullptr) {
      rate_limiter->ReportAndWait(data.size());
    }
I
Igor Canadi 已提交
965 966 967 968 969 970 971 972 973 974
  } while (s.ok() && data.size() > 0 && size_limit > 0);

  if (s.ok() && sync) {
    s = dst_file->Sync();
  }

  return s;
}

// src_fname will always start with "/"
I
Igor Canadi 已提交
975 976 977
Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
                                    bool shared, const std::string& src_dir,
                                    const std::string& src_fname,
L
Lei Jin 已提交
978
                                    BackupRateLimiter* rate_limiter,
979 980
                                    uint64_t size_limit,
                                    bool shared_checksum) {
I
Igor Canadi 已提交
981 982 983

  assert(src_fname.size() > 0 && src_fname[0] == '/');
  std::string dst_relative = src_fname.substr(1);
I
Igor Canadi 已提交
984
  std::string dst_relative_tmp;
985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
  Status s;
  uint64_t size;
  uint32_t checksum_value = 0;

  if (shared && shared_checksum) {
    // add checksum and file length to the file name
    s = CalculateChecksum(src_dir + src_fname,
                          db_env_,
                          size_limit,
                          &checksum_value);
    if (s.ok()) {
        s = db_env_->GetFileSize(src_dir + src_fname, &size);
    }
    if (!s.ok()) {
         return s;
    }
    dst_relative = GetSharedFileWithChecksum(dst_relative, checksum_value,
                                             size);
    dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
    dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
  } else if (shared) {
I
Igor Canadi 已提交
1006 1007
    dst_relative_tmp = GetSharedFileRel(dst_relative, true);
    dst_relative = GetSharedFileRel(dst_relative, false);
I
Igor Canadi 已提交
1008
  } else {
I
Igor Canadi 已提交
1009 1010
    dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative);
    dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
I
Igor Canadi 已提交
1011 1012
  }
  std::string dst_path = GetAbsolutePath(dst_relative);
I
Igor Canadi 已提交
1013
  std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp);
I
Igor Canadi 已提交
1014 1015 1016

  // if it's shared, we also need to check if it exists -- if it does,
  // no need to copy it again
I
Igor Canadi 已提交
1017
  bool need_to_copy = true;
I
Igor Canadi 已提交
1018
  if (shared && backup_env_->FileExists(dst_path)) {
I
Igor Canadi 已提交
1019
    need_to_copy = false;
1020 1021 1022 1023
    if (shared_checksum) {
      Log(options_.info_log,
          "%s already present, with checksum %u and size %" PRIu64,
          src_fname.c_str(), checksum_value, size);
I
Igor Canadi 已提交
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
    } else if (backuped_file_infos_.find(dst_relative) ==
               backuped_file_infos_.end()) {
      // file already exists, but it's not referenced by any backup. overwrite
      // the file
      Log(options_.info_log,
          "%s already present, but not referenced by any backup. We will "
          "overwrite the file.",
          src_fname.c_str());
      need_to_copy = true;
      backup_env_->DeleteFile(dst_path);
1034
    } else {
I
Igor Canadi 已提交
1035 1036
      // the file is present and referenced by a backup
      db_env_->GetFileSize(src_dir + src_fname, &size);  // Ignore error
1037 1038
      Log(options_.info_log, "%s already present, calculate checksum",
          src_fname.c_str());
I
Igor Canadi 已提交
1039
      s = CalculateChecksum(src_dir + src_fname, db_env_, size_limit,
1040 1041
                            &checksum_value);
    }
I
Igor Canadi 已提交
1042 1043
  }
  if (need_to_copy) {
I
Igor Canadi 已提交
1044 1045
    Log(options_.info_log, "Copying %s to %s", src_fname.c_str(),
        dst_path_tmp.c_str());
I
Igor Canadi 已提交
1046
    s = CopyFile(src_dir + src_fname,
I
Igor Canadi 已提交
1047
                 dst_path_tmp,
I
Igor Canadi 已提交
1048 1049 1050
                 db_env_,
                 backup_env_,
                 options_.sync,
I
Igor Canadi 已提交
1051
                 rate_limiter,
I
Igor Canadi 已提交
1052
                 &size,
L
Lei Jin 已提交
1053
                 &checksum_value,
I
Igor Canadi 已提交
1054
                 size_limit);
I
Igor Canadi 已提交
1055 1056 1057
    if (s.ok() && shared) {
      s = backup_env_->RenameFile(dst_path_tmp, dst_path);
    }
I
Igor Canadi 已提交
1058 1059
  }
  if (s.ok()) {
1060 1061
    s = backup->AddFile(std::make_shared<FileInfo>(
          dst_relative, size, checksum_value));
L
Lei Jin 已提交
1062 1063 1064 1065
  }
  return s;
}

I
Igor Canadi 已提交
1066 1067 1068
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
                                           uint64_t size_limit,
                                           uint32_t* checksum_value) {
L
Lei Jin 已提交
1069 1070 1071 1072 1073 1074 1075
  *checksum_value = 0;
  if (size_limit == 0) {
    size_limit = std::numeric_limits<uint64_t>::max();
  }

  EnvOptions env_options;
  env_options.use_mmap_writes = false;
I
Igor Canadi 已提交
1076
  env_options.use_os_buffer = false;
L
Lei Jin 已提交
1077 1078 1079 1080 1081

  std::unique_ptr<SequentialFile> src_file;
  Status s = src_env->NewSequentialFile(src, &src_file, env_options);
  if (!s.ok()) {
    return s;
I
Igor Canadi 已提交
1082
  }
L
Lei Jin 已提交
1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102

  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  Slice data;

  do {
    if (stop_backup_.load(std::memory_order_acquire)) {
      return Status::Incomplete("Backup stopped");
    }
    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
      copy_file_buffer_size_ : size_limit;
    s = src_file->Read(buffer_to_read, &data, buf.get());

    if (!s.ok()) {
      return s;
    }

    size_limit -= data.size();
    *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
  } while (data.size() > 0 && size_limit > 0);

I
Igor Canadi 已提交
1103 1104 1105
  return s;
}

1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122
void BackupEngineImpl::DeleteChildren(const std::string& dir,
                                      uint32_t file_type_filter) {
  std::vector<std::string> children;
  db_env_->GetChildren(dir, &children);  // ignore errors

  for (const auto& f : children) {
    uint64_t number;
    FileType type;
    bool ok = ParseFileName(f, &number, &type);
    if (ok && (file_type_filter & (1 << type))) {
      // don't delete this file
      continue;
    }
    db_env_->DeleteFile(dir + "/" + f);  // ignore errors
  }
}

H
Hasnain Lakhani 已提交
1123
Status BackupEngineImpl::GarbageCollect() {
I
Igor Canadi 已提交
1124
  assert(!read_only_);
I
Igor Canadi 已提交
1125
  Log(options_.info_log, "Starting garbage collection");
H
Hasnain Lakhani 已提交
1126 1127 1128 1129 1130 1131 1132

  // delete obsolete shared files
  std::vector<std::string> shared_children;
  backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
                           &shared_children);
  for (auto& child : shared_children) {
    std::string rel_fname = GetSharedFileRel(child);
I
Igor Canadi 已提交
1133
    auto child_itr = backuped_file_infos_.find(rel_fname);
H
Hasnain Lakhani 已提交
1134
    // if it's not refcounted, delete it
I
Igor Canadi 已提交
1135 1136
    if (child_itr == backuped_file_infos_.end() ||
        child_itr->second->refs == 0) {
H
Hasnain Lakhani 已提交
1137 1138 1139 1140 1141
      // this might be a directory, but DeleteFile will just fail in that
      // case, so we're good
      Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
      if (s.ok()) {
        Log(options_.info_log, "Deleted %s", rel_fname.c_str());
I
Igor Canadi 已提交
1142
      }
I
Igor Canadi 已提交
1143
      backuped_file_infos_.erase(rel_fname);
I
Igor Canadi 已提交
1144
    }
H
Hasnain Lakhani 已提交
1145
  }
I
Igor Canadi 已提交
1146

H
Hasnain Lakhani 已提交
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  // delete obsolete private files
  std::vector<std::string> private_children;
  backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
                           &private_children);
  for (auto& child : private_children) {
    BackupID backup_id = 0;
    bool tmp_dir = child.find(".tmp") != std::string::npos;
    sscanf(child.c_str(), "%u", &backup_id);
    if (!tmp_dir &&  // if it's tmp_dir, delete it
        (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
      // it's either not a number or it's still alive. continue
      continue;
    }
    // here we have to delete the dir and all its children
    std::string full_private_path =
        GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir));
    std::vector<std::string> subchildren;
    backup_env_->GetChildren(full_private_path, &subchildren);
    for (auto& subchild : subchildren) {
      Status s = backup_env_->DeleteFile(full_private_path + subchild);
      if (s.ok()) {
        Log(options_.info_log, "Deleted %s",
            (full_private_path + subchild).c_str());
I
Igor Canadi 已提交
1170 1171
      }
    }
H
Hasnain Lakhani 已提交
1172 1173 1174 1175
    // finally delete the private dir
    Status s = backup_env_->DeleteDir(full_private_path);
    Log(options_.info_log, "Deleted dir %s -- %s", full_private_path.c_str(),
        s.ToString().c_str());
I
Igor Canadi 已提交
1176
  }
H
Hasnain Lakhani 已提交
1177 1178

  return Status::OK();
I
Igor Canadi 已提交
1179 1180 1181 1182
}

// ------- BackupMeta class --------

1183 1184 1185
Status BackupEngineImpl::BackupMeta::AddFile(
    std::shared_ptr<FileInfo> file_info) {
  auto itr = file_infos_->find(file_info->filename);
L
Lei Jin 已提交
1186
  if (itr == file_infos_->end()) {
1187
    auto ret = file_infos_->insert({file_info->filename, file_info});
L
Lei Jin 已提交
1188
    if (ret.second) {
1189 1190
      itr = ret.first;
      itr->second->refs = 1;
L
Lei Jin 已提交
1191 1192 1193 1194
    } else {
      // if this happens, something is seriously wrong
      return Status::Corruption("In memory metadata insertion error");
    }
I
Igor Canadi 已提交
1195
  } else {
1196
    if (itr->second->checksum_value != file_info->checksum_value) {
I
Igor Canadi 已提交
1197 1198 1199
      return Status::Corruption(
          "Checksum mismatch for existing backup file. Delete old backups and "
          "try again.");
L
Lei Jin 已提交
1200
    }
1201
    ++itr->second->refs;  // increase refcount if already present
I
Igor Canadi 已提交
1202
  }
L
Lei Jin 已提交
1203

1204 1205 1206
  size_ += file_info->size;
  files_.push_back(itr->second);

L
Lei Jin 已提交
1207
  return Status::OK();
I
Igor Canadi 已提交
1208 1209
}

I
Igor Canadi 已提交
1210
void BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
L
Lei Jin 已提交
1211
  for (const auto& file : files_) {
1212
    --file->refs;  // decrease refcount
I
Igor Canadi 已提交
1213 1214 1215
  }
  files_.clear();
  // delete meta file
I
Igor Canadi 已提交
1216 1217 1218
  if (delete_meta) {
    env_->DeleteFile(meta_filename_);
  }
I
Igor Canadi 已提交
1219 1220 1221 1222 1223
  timestamp_ = 0;
}

// each backup meta file is of the format:
// <timestamp>
1224
// <seq number>
I
Igor Canadi 已提交
1225
// <number of files>
L
Lei Jin 已提交
1226 1227
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
I
Igor Canadi 已提交
1228
// ...
I
Igor Canadi 已提交
1229 1230
Status BackupEngineImpl::BackupMeta::LoadFromFile(
    const std::string& backup_dir) {
I
Igor Canadi 已提交
1231 1232 1233 1234 1235 1236 1237 1238
  assert(Empty());
  Status s;
  unique_ptr<SequentialFile> backup_meta_file;
  s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  if (!s.ok()) {
    return s;
  }

1239 1240 1241
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
  Slice data;
  s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf.get());
I
Igor Canadi 已提交
1242 1243

  if (!s.ok() || data.size() == max_backup_meta_file_size_) {
L
Lei Jin 已提交
1244
    return s.ok() ? Status::Corruption("File size too big") : s;
I
Igor Canadi 已提交
1245 1246 1247 1248
  }
  buf[data.size()] = 0;

  uint32_t num_files = 0;
1249 1250 1251 1252 1253
  char *next;
  timestamp_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  sequence_number_ = strtoull(data.data(), &next, 10);
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1254
  num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1255
  data.remove_prefix(next - data.data() + 1); // +1 for '\n'
I
Igor Canadi 已提交
1256

1257
  std::vector<std::shared_ptr<FileInfo>> files;
1258

1259 1260
  Slice checksum_prefix("crc32 ");

I
Igor Canadi 已提交
1261
  for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
L
Lei Jin 已提交
1262 1263 1264
    auto line = GetSliceUntil(&data, '\n');
    std::string filename = GetSliceUntil(&line, ' ').ToString();

I
Igor Canadi 已提交
1265
    uint64_t size;
1266 1267
    const std::shared_ptr<FileInfo> file_info = GetFile(filename);
    if (file_info) {
1268 1269 1270 1271 1272 1273
      size = file_info->size;
    } else {
      s = env_->GetFileSize(backup_dir + "/" + filename, &size);
      if (!s.ok()) {
        return s;
      }
I
Igor Canadi 已提交
1274
    }
L
Lei Jin 已提交
1275 1276

    if (line.empty()) {
I
Igor Canadi 已提交
1277 1278
      return Status::Corruption("File checksum is missing for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
1279 1280 1281
    }

    uint32_t checksum_value = 0;
1282 1283
    if (line.starts_with(checksum_prefix)) {
      line.remove_prefix(checksum_prefix.size());
1284 1285
      checksum_value = static_cast<uint32_t>(
          strtoul(line.data(), nullptr, 10));
S
sdong 已提交
1286
      if (line != rocksdb::ToString(checksum_value)) {
I
Igor Canadi 已提交
1287 1288
        return Status::Corruption("Invalid checksum value for " + filename +
                                  " in " + meta_filename_);
L
Lei Jin 已提交
1289 1290
      }
    } else {
I
Igor Canadi 已提交
1291 1292
      return Status::Corruption("Unknown checksum type for " + filename +
                                " in " + meta_filename_);
L
Lei Jin 已提交
1293 1294
    }

1295
    files.emplace_back(new FileInfo(filename, size, checksum_value));
1296 1297
  }

I
Igor Canadi 已提交
1298 1299
  if (s.ok() && data.size() > 0) {
    // file has to be read completely. if not, we count it as corruption
I
Igor Canadi 已提交
1300 1301
    s = Status::Corruption("Tailing data in backup meta file in " +
                           meta_filename_);
I
Igor Canadi 已提交
1302 1303
  }

1304
  if (s.ok()) {
1305
    files_.reserve(files.size());
L
Lei Jin 已提交
1306 1307 1308 1309 1310
    for (const auto& file_info : files) {
      s = AddFile(file_info);
      if (!s.ok()) {
        break;
      }
1311
    }
I
Igor Canadi 已提交
1312 1313 1314 1315 1316
  }

  return s;
}

I
Igor Canadi 已提交
1317
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
I
Igor Canadi 已提交
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
  Status s;
  unique_ptr<WritableFile> backup_meta_file;
  EnvOptions env_options;
  env_options.use_mmap_writes = false;
  s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file,
                            env_options);
  if (!s.ok()) {
    return s;
  }

1328
  unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
I
Igor Canadi 已提交
1329
  int len = 0, buf_size = max_backup_meta_file_size_;
1330
  len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1331 1332
  len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
                  sequence_number_);
D
Dmitri Smirnov 已提交
1333
  len += snprintf(buf.get() + len, buf_size - len, "%" ROCKSDB_PRIszt "\n", files_.size());
L
Lei Jin 已提交
1334 1335 1336
  for (const auto& file : files_) {
    // use crc32 for now, switch to something else if needed
    len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
1337
                    file->filename.c_str(), file->checksum_value);
I
Igor Canadi 已提交
1338 1339
  }

1340
  s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
I
Igor Canadi 已提交
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
  if (s.ok() && sync) {
    s = backup_meta_file->Sync();
  }
  if (s.ok()) {
    s = backup_meta_file->Close();
  }
  if (s.ok()) {
    s = env_->RenameFile(meta_filename_ + ".tmp", meta_filename_);
  }
  return s;
}

I
Igor Canadi 已提交
1353 1354 1355
// -------- BackupEngineReadOnlyImpl ---------
class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
 public:
I
Igor Canadi 已提交
1356 1357 1358
  BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
      : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}

I
Igor Canadi 已提交
1359 1360
  virtual ~BackupEngineReadOnlyImpl() {}

I
Igor Sugak 已提交
1361
  virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
I
Igor Canadi 已提交
1362 1363 1364
    backup_engine_->GetBackupInfo(backup_info);
  }

I
Igor Sugak 已提交
1365 1366
  virtual void GetCorruptedBackups(
      std::vector<BackupID>* corrupt_backup_ids) override {
H
Hasnain Lakhani 已提交
1367 1368 1369
    backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
  }

I
Igor Canadi 已提交
1370 1371
  virtual Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
I
Igor Sugak 已提交
1372
      const RestoreOptions& restore_options = RestoreOptions()) override {
I
Igor Canadi 已提交
1373 1374 1375 1376 1377 1378
    return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                               restore_options);
  }

  virtual Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
I
Igor Sugak 已提交
1379
      const RestoreOptions& restore_options = RestoreOptions()) override {
I
Igor Canadi 已提交
1380 1381 1382 1383 1384
    return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                     restore_options);
  }

 private:
I
Igor Canadi 已提交
1385
  std::unique_ptr<BackupEngineImpl> backup_engine_;
I
Igor Canadi 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396
};

BackupEngineReadOnly* BackupEngineReadOnly::NewReadOnlyBackupEngine(
    Env* db_env, const BackupableDBOptions& options) {
  if (options.destroy_old_data) {
    assert(false);
    return nullptr;
  }
  return new BackupEngineReadOnlyImpl(db_env, options);
}

I
Igor Canadi 已提交
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407
Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
                                  BackupEngineReadOnly** backup_engine_ptr) {
  if (options.destroy_old_data) {
    assert(false);
    return Status::InvalidArgument(
        "Can't destroy old data with ReadOnly BackupEngine");
  }
  *backup_engine_ptr = new BackupEngineReadOnlyImpl(env, options);
  return Status::OK();
}

I
Igor Canadi 已提交
1408 1409
// --- BackupableDB methods --------

1410
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
I
Igor Canadi 已提交
1411
    : StackableDB(db),
I
Igor Canadi 已提交
1412
      backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {}
I
Igor Canadi 已提交
1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425

BackupableDB::~BackupableDB() {
  delete backup_engine_;
}

Status BackupableDB::CreateNewBackup(bool flush_before_backup) {
  return backup_engine_->CreateNewBackup(this, flush_before_backup);
}

void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  backup_engine_->GetBackupInfo(backup_info);
}

H
Hasnain Lakhani 已提交
1426 1427 1428 1429 1430
void
BackupableDB::GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) {
  backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}

I
Igor Canadi 已提交
1431 1432 1433 1434 1435 1436 1437 1438
Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status BackupableDB::DeleteBackup(BackupID backup_id) {
  return backup_engine_->DeleteBackup(backup_id);
}

I
Igor Canadi 已提交
1439 1440 1441 1442
void BackupableDB::StopBackup() {
  backup_engine_->StopBackup();
}

H
Hasnain Lakhani 已提交
1443 1444 1445 1446
Status BackupableDB::GarbageCollect() {
  return backup_engine_->GarbageCollect();
}

I
Igor Canadi 已提交
1447 1448 1449 1450
// --- RestoreBackupableDB methods ------

RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
                                         const BackupableDBOptions& options)
I
Igor Canadi 已提交
1451
    : backup_engine_(new BackupEngineImpl(db_env, options)) {}
I
Igor Canadi 已提交
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461

RestoreBackupableDB::~RestoreBackupableDB() {
  delete backup_engine_;
}

void
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  backup_engine_->GetBackupInfo(backup_info);
}

H
Hasnain Lakhani 已提交
1462 1463 1464 1465 1466
void RestoreBackupableDB::GetCorruptedBackups(
    std::vector<BackupID>* corrupt_backup_ids) {
  backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}

1467 1468 1469 1470 1471
Status RestoreBackupableDB::RestoreDBFromBackup(
    BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
  return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
                                             restore_options);
I
Igor Canadi 已提交
1472 1473
}

1474 1475 1476 1477 1478
Status RestoreBackupableDB::RestoreDBFromLatestBackup(
    const std::string& db_dir, const std::string& wal_dir,
    const RestoreOptions& restore_options) {
  return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
                                                   restore_options);
I
Igor Canadi 已提交
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488
}

Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
  return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}

Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) {
  return backup_engine_->DeleteBackup(backup_id);
}

H
Hasnain Lakhani 已提交
1489 1490 1491 1492
Status RestoreBackupableDB::GarbageCollect() {
  return backup_engine_->GarbageCollect();
}

I
Igor Canadi 已提交
1493
}  // namespace rocksdb
I
Igor Canadi 已提交
1494 1495

#endif  // ROCKSDB_LITE