pegasus_server_impl.cpp 95.2 KB
Newer Older
Q
qinzuoyan 已提交
1 2 3 4 5 6 7 8
// Copyright (c) 2017, Xiaomi, Inc.  All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "pegasus_server_impl.h"
#include <pegasus_key_schema.h>
#include <pegasus_value_schema.h>
#include <pegasus_utils.h>
9
#include <dsn/utility/smart_pointers.h>
Q
qinzuoyan 已提交
10
#include <dsn/utility/utils.h>
11
#include <dsn/utility/filesystem.h>
Q
qinzuoyan 已提交
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/table.h>
#include <boost/lexical_cast.hpp>
#include <algorithm>

namespace pegasus {
namespace server {

// Although we have removed the INCR operator, but we need reserve the code for compatibility
// reason,
// because there may be some mutation log entries which include the code. Even if these entries need
// not to be applied to rocksdb, they may be deserialized.
DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_INCR, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)

static std::string chkpt_get_dir_name(int64_t decree)
{
    char buffer[256];
    sprintf(buffer, "checkpoint.%" PRId64 "", decree);
    return std::string(buffer);
}

static bool chkpt_init_from_dir(const char *name, int64_t &decree)
{
    return 1 == sscanf(name, "checkpoint.%" PRId64 "", &decree) &&
           std::string(name) == chkpt_get_dir_name(decree);
}

39 40
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
    : dsn::apps::rrdb_service(r),
Q
qinzuoyan 已提交
41 42 43
      _db(nullptr),
      _is_open(false),
      _value_schema_version(0),
C
cailiuyang 已提交
44
      _last_durable_decree(0),
Q
qinzuoyan 已提交
45 46 47
      _physical_error(0),
      _is_checkpointing(false)
{
48 49
    _primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
    _gpid = get_gpid();
Q
qinzuoyan 已提交
50 51 52 53
    _verbose_log = dsn_config_get_value_bool("pegasus.server",
                                             "rocksdb_verbose_log",
                                             false,
                                             "print verbose log for debugging, default is false");
54 55 56 57 58 59 60 61 62 63
    _abnormal_get_time_threshold_ns = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_get_time_threshold_ns",
        0,
        "rocksdb_abnormal_get_time_threshold_ns, default is 0, means no check");
    _abnormal_get_size_threshold = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_get_size_threshold",
        0,
        "rocksdb_abnormal_get_size_threshold, default is 0, means no check");
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
    _abnormal_multi_get_time_threshold_ns = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_multi_get_time_threshold_ns",
        0,
        "rocksdb_abnormal_multi_get_time_threshold_ns, default is 0, means no check");
    _abnormal_multi_get_size_threshold = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_multi_get_size_threshold",
        0,
        "rocksdb_abnormal_multi_get_size_threshold, default is 0, means no check");
    _abnormal_multi_get_iterate_count_threshold = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_multi_get_iterate_count_threshold",
        0,
        "rocksdb_abnormal_multi_get_iterate_count_threshold, default is 0, means no check");
Q
qinzuoyan 已提交
79 80 81

    // init db options

82 83 84 85 86 87 88 89 90 91 92 93 94 95
    // rocksdb default: snappy
    std::string compression_str = dsn_config_get_value_string(
        "pegasus.server",
        "rocksdb_compression_type",
        "none",
        "rocksdb options.compression, default none. Supported: snappy, none.");
    if (compression_str == "none") {
        _db_opts.compression = rocksdb::kNoCompression;
    } else if (compression_str == "snappy") {
        _db_opts.compression = rocksdb::kSnappyCompression;
    } else {
        dassert("unsupported compression type: %s", compression_str.c_str());
    }

Q
qinzuoyan 已提交
96 97 98 99
    // rocksdb default: 4MB
    _db_opts.write_buffer_size =
        (size_t)dsn_config_get_value_uint64("pegasus.server",
                                            "rocksdb_write_buffer_size",
100 101
                                            64 * 1024 * 1024,
                                            "rocksdb options.write_buffer_size, default 64MB");
Q
qinzuoyan 已提交
102 103 104 105 106

    // rocksdb default: 2
    _db_opts.max_write_buffer_number =
        (int)dsn_config_get_value_uint64("pegasus.server",
                                         "rocksdb_max_write_buffer_number",
107 108 109 110 111 112 113
                                         4,
                                         "rocksdb options.max_write_buffer_number, default 4");

    // rocksdb default: 1
    _db_opts.max_background_flushes =
        (int)dsn_config_get_value_uint64("pegasus.server",
                                         "rocksdb_max_background_flushes",
114 115
                                         1,
                                         "rocksdb options.max_background_flushes, default 1");
Q
qinzuoyan 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131

    // rocksdb default: 1
    _db_opts.max_background_compactions =
        (int)dsn_config_get_value_uint64("pegasus.server",
                                         "rocksdb_max_background_compactions",
                                         2,
                                         "rocksdb options.max_background_compactions, default 2");

    // rocksdb default: 7
    _db_opts.num_levels = dsn_config_get_value_uint64(
        "pegasus.server", "rocksdb_num_levels", 6, "rocksdb options.num_levels, default 6");

    // rocksdb default: 2MB
    _db_opts.target_file_size_base =
        dsn_config_get_value_uint64("pegasus.server",
                                    "rocksdb_target_file_size_base",
132 133
                                    64 * 1024 * 1024,
                                    "rocksdb options.target_file_size_base, default 64MB");
Q
qinzuoyan 已提交
134 135 136 137 138

    // rocksdb default: 10MB
    _db_opts.max_bytes_for_level_base =
        dsn_config_get_value_uint64("pegasus.server",
                                    "rocksdb_max_bytes_for_level_base",
139 140
                                    10 * 64 * 1024 * 1024,
                                    "rocksdb options.max_bytes_for_level_base, default 640MB");
Q
qinzuoyan 已提交
141

142 143 144 145 146 147 148
// Deprecated
//    // rocksdb default: 10
//    _db_opts.max_grandparent_overlap_factor = (int)dsn_config_get_value_uint64(
//        "pegasus.server",
//        "rocksdb_max_grandparent_overlap_factor",
//        10,
//        "rocksdb options.max_grandparent_overlap_factor, default 10");
Q
qinzuoyan 已提交
149 150 151 152 153

    // rocksdb default: 4
    _db_opts.level0_file_num_compaction_trigger =
        (int)dsn_config_get_value_uint64("pegasus.server",
                                         "rocksdb_level0_file_num_compaction_trigger",
154 155
                                         10,
                                         "rocksdb options.level0_file_num_compaction_trigger, 10");
Q
qinzuoyan 已提交
156 157 158 159 160

    // rocksdb default: 20
    _db_opts.level0_slowdown_writes_trigger = (int)dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_level0_slowdown_writes_trigger",
161 162
        30,
        "rocksdb options.level0_slowdown_writes_trigger, default 30");
Q
qinzuoyan 已提交
163 164 165 166 167

    // rocksdb default: 24
    _db_opts.level0_stop_writes_trigger =
        (int)dsn_config_get_value_uint64("pegasus.server",
                                         "rocksdb_level0_stop_writes_trigger",
168 169
                                         60,
                                         "rocksdb options.level0_stop_writes_trigger, default 60");
Q
qinzuoyan 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201

    // disable table block cache, default: false
    if ((bool)dsn_config_get_value_bool(
            "pegasus.server",
            "rocksdb_disable_table_block_cache",
            false,
            "rocksdb options.disable_table_block_cache, default false")) {
        rocksdb::BlockBasedTableOptions table_options;
        table_options.no_block_cache = true;
        table_options.block_restart_interval = 4;
        _db_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
    }

    // disable write ahead logging as replication handles logging instead now
    _wt_opts.disableWAL = true;

    // get the checkpoint reserve options.
    _checkpoint_reserve_min_count = (uint32_t)dsn_config_get_value_uint64(
        "pegasus.server", "checkpoint_reserve_min_count", 3, "checkpoint_reserve_min_count");
    _checkpoint_reserve_time_seconds =
        (uint32_t)dsn_config_get_value_uint64("pegasus.server",
                                              "checkpoint_reserve_time_seconds",
                                              3600,
                                              "checkpoint_reserve_time_seconds");

    // get the _updating_sstsize_inteval_seconds.
    _updating_rocksdb_sstsize_interval_seconds =
        (uint32_t)dsn_config_get_value_uint64("pegasus.server",
                                              "updating_rocksdb_sstsize_interval_seconds",
                                              600,
                                              "updating_rocksdb_sstsize_interval_seconds");

202 203 204 205
    // TODO: move the qps/latency counters and it's statistics to replication_app_base layer
    char str_gpid[128], buf[256];
    snprintf(str_gpid, 128, "%d.%d", _gpid.get_app_id(), _gpid.get_partition_index());

Q
qinzuoyan 已提交
206
    // register the perf counters
207
    snprintf(buf, 255, "get_qps@%s", str_gpid);
208 209
    _pfc_get_qps.init_app_counter(
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of GET request");
Q
qinzuoyan 已提交
210

211
    snprintf(buf, 255, "multi_get_qps@%s", str_gpid);
212
    _pfc_multi_get_qps.init_app_counter(
Q
qinzuoyan 已提交
213 214
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");

215
    snprintf(buf, 255, "scan_qps@%s", str_gpid);
216 217
    _pfc_scan_qps.init_app_counter(
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
Q
qinzuoyan 已提交
218

219
    snprintf(buf, 255, "put_qps@%s", str_gpid);
220 221
    _pfc_put_qps.init_app_counter(
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of PUT request");
Q
qinzuoyan 已提交
222

223
    snprintf(buf, 255, "multi_put_qps@%s", str_gpid);
224
    _pfc_multi_put_qps.init_app_counter(
Q
qinzuoyan 已提交
225 226
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");

227
    snprintf(buf, 255, "remove_qps@%s", str_gpid);
228
    _pfc_remove_qps.init_app_counter(
Q
qinzuoyan 已提交
229 230
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");

231
    snprintf(buf, 255, "multi_remove_qps@%s", str_gpid);
232
    _pfc_multi_remove_qps.init_app_counter(
Q
qinzuoyan 已提交
233 234
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of MULTI_REMOVE request");

235
    snprintf(buf, 255, "get_latency@%s", str_gpid);
236 237 238 239
    _pfc_get_latency.init_app_counter("app.pegasus",
                                      buf,
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of GET request");
Q
qinzuoyan 已提交
240

241
    snprintf(buf, 255, "multi_get_latency@%s", str_gpid);
242 243 244 245
    _pfc_multi_get_latency.init_app_counter("app.pegasus",
                                            buf,
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_GET request");
Q
qinzuoyan 已提交
246

247
    snprintf(buf, 255, "scan_latency@%s", str_gpid);
248 249 250 251
    _pfc_scan_latency.init_app_counter("app.pegasus",
                                       buf,
                                       COUNTER_TYPE_NUMBER_PERCENTILES,
                                       "statistic the latency of SCAN request");
Q
qinzuoyan 已提交
252

253
    snprintf(buf, 255, "put_latency@%s", str_gpid);
254 255 256 257
    _pfc_put_latency.init_app_counter("app.pegasus",
                                      buf,
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of PUT request");
Q
qinzuoyan 已提交
258

259
    snprintf(buf, 255, "multi_put_latency@%s", str_gpid);
260 261 262 263
    _pfc_multi_put_latency.init_app_counter("app.pegasus",
                                            buf,
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_PUT request");
Q
qinzuoyan 已提交
264

265
    snprintf(buf, 255, "remove_latency@%s", str_gpid);
266 267 268 269
    _pfc_remove_latency.init_app_counter("app.pegasus",
                                         buf,
                                         COUNTER_TYPE_NUMBER_PERCENTILES,
                                         "statistic the latency of REMOVE request");
Q
qinzuoyan 已提交
270

271
    snprintf(buf, 255, "multi_remove_latency@%s", str_gpid);
272 273 274 275
    _pfc_multi_remove_latency.init_app_counter("app.pegasus",
                                               buf,
                                               COUNTER_TYPE_NUMBER_PERCENTILES,
                                               "statistic the latency of MULTI_REMOVE request");
Q
qinzuoyan 已提交
276

277
    snprintf(buf, 255, "recent.expire.count@%s", str_gpid);
278 279 280 281
    _pfc_recent_expire_count.init_app_counter("app.pegasus",
                                              buf,
                                              COUNTER_TYPE_VOLATILE_NUMBER,
                                              "statistic the recent expired value read count");
282

283
    snprintf(buf, 255, "recent.filter.count@%s", str_gpid);
284 285 286 287
    _pfc_recent_filter_count.init_app_counter("app.pegasus",
                                              buf,
                                              COUNTER_TYPE_VOLATILE_NUMBER,
                                              "statistic the recent filtered value read count");
288

289 290 291 292 293 294
    snprintf(buf, 255, "recent.abnormal.count@%s", str_gpid);
    _pfc_recent_abnormal_count.init_app_counter("app.pegasus",
                                                buf,
                                                COUNTER_TYPE_VOLATILE_NUMBER,
                                                "statistic the recent abnormal read count");

295
    snprintf(buf, 255, "disk.storage.sst.count@%s", str_gpid);
296
    _pfc_sst_count.init_app_counter(
Q
qinzuoyan 已提交
297
        "app.pegasus", buf, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
298

299
    snprintf(buf, 255, "disk.storage.sst(MB)@%s", str_gpid);
300
    _pfc_sst_size.init_app_counter(
Q
qinzuoyan 已提交
301
        "app.pegasus", buf, COUNTER_TYPE_NUMBER, "statistic the size of sstable files");
302

Q
qinzuoyan 已提交
303 304 305 306 307 308
    updating_rocksdb_sstsize();
}

void pegasus_server_impl::parse_checkpoints()
{
    std::vector<std::string> dirs;
309
    ::dsn::utils::filesystem::get_subdirectories(data_dir(), dirs, false);
Q
qinzuoyan 已提交
310 311 312 313 314 315

    ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);

    _checkpoints.clear();
    for (auto &d : dirs) {
        int64_t ci;
316
        std::string d1 = d.substr(data_dir().size() + 1);
Q
qinzuoyan 已提交
317 318 319
        if (chkpt_init_from_dir(d1.c_str(), ci)) {
            _checkpoints.push_back(ci);
        } else if (d1.find("checkpoint") != std::string::npos) {
320
            ddebug("%s: invalid checkpoint directory %s, remove it", replica_name(), d.c_str());
Q
qinzuoyan 已提交
321 322
            ::dsn::utils::filesystem::remove_path(d);
            if (!::dsn::utils::filesystem::remove_path(d)) {
323 324
                derror(
                    "%s: remove invalid checkpoint directory %s failed", replica_name(), d.c_str());
Q
qinzuoyan 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
            }
        }
    }

    if (!_checkpoints.empty()) {
        std::sort(_checkpoints.begin(), _checkpoints.end());
        set_last_durable_decree(_checkpoints.back());
    } else {
        set_last_durable_decree(0);
    }
}

void pegasus_server_impl::gc_checkpoints()
{
    std::deque<int64_t> temp_list;
    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        if (_checkpoints.size() <= _checkpoint_reserve_min_count)
            return;
        temp_list = _checkpoints;
    }

    // find the max checkpoint which can be deleted
    int64_t max_del_d = -1;
    uint64_t current_time = dsn_now_ms() / 1000;
    for (int i = 0; i < temp_list.size(); ++i) {
        if (i + _checkpoint_reserve_min_count >= temp_list.size())
            break;
        int64_t d = temp_list[i];
        // we check last write time of "CURRENT" instead of directory, because the directory's
        // last write time may be updated by previous incompleted garbage collection.
356
        auto cpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(d));
Q
qinzuoyan 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
        auto current_file = ::dsn::utils::filesystem::path_combine(cpt_dir, "CURRENT");
        if (!::dsn::utils::filesystem::file_exists(current_file)) {
            max_del_d = d;
            continue;
        }
        time_t tm;
        if (!dsn::utils::filesystem::last_write_time(current_file, tm)) {
            dwarn("get last write time of file %s failed", current_file.c_str());
            break;
        }
        uint64_t last_write_time = (uint64_t)tm;
        if (last_write_time + _checkpoint_reserve_time_seconds >= current_time) {
            // not expired
            break;
        }
        max_del_d = d;
    }
    if (max_del_d == -1) {
        // no checkpoint to delete
        ddebug("%s: no checkpoint to garbage collection, checkpoints_count = %d",
377
               replica_name(),
Q
qinzuoyan 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
               (int)temp_list.size());
        return;
    }
    std::list<int64_t> to_delete_list;
    int64_t min_d = 0;
    int64_t max_d = 0;
    int checkpoints_count = 0;
    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        int delete_max_index = -1;
        for (int i = 0; i < _checkpoints.size(); ++i) {
            int64_t del_d = _checkpoints[i];
            if (i + _checkpoint_reserve_min_count >= _checkpoints.size() || del_d > max_del_d)
                break;
            to_delete_list.push_back(del_d);
            delete_max_index = i;
        }
395
        if (delete_max_index >= 0) {
Q
qinzuoyan 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
            _checkpoints.erase(_checkpoints.begin(), _checkpoints.begin() + delete_max_index + 1);
        }

        if (!_checkpoints.empty()) {
            min_d = _checkpoints.front();
            max_d = _checkpoints.back();
            checkpoints_count = _checkpoints.size();
        } else {
            min_d = 0;
            max_d = 0;
            checkpoints_count = 0;
        }
    }

    // do delete
    std::list<int64_t> put_back_list;
    for (auto &del_d : to_delete_list) {
413 414
        auto cpt_dir =
            ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(del_d));
Q
qinzuoyan 已提交
415 416 417
        if (::dsn::utils::filesystem::directory_exists(cpt_dir)) {
            if (::dsn::utils::filesystem::remove_path(cpt_dir)) {
                ddebug("%s: checkpoint directory %s removed by garbage collection",
418
                       replica_name(),
Q
qinzuoyan 已提交
419 420 421
                       cpt_dir.c_str());
            } else {
                derror("%s: checkpoint directory %s remove failed by garbage collection",
422
                       replica_name(),
Q
qinzuoyan 已提交
423 424 425 426 427
                       cpt_dir.c_str());
                put_back_list.push_back(del_d);
            }
        } else {
            ddebug("%s: checkpoint directory %s does not exist, ignored by garbage collection",
428
                   replica_name(),
Q
qinzuoyan 已提交
429 430 431 432
                   cpt_dir.c_str());
        }
    }

433 434 435
    // put back checkpoints which is not deleted, to make it delete again in the next gc time.
    // ATTENTION: the put back checkpoint may be incomplete, which will cause failure on load. But
    // it would not cause data lost, because incomplete checkpoint can not be loaded successfully.
Q
qinzuoyan 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
    if (!put_back_list.empty()) {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        if (_checkpoints.empty() || put_back_list.back() < _checkpoints.front()) {
            // just insert to front will hold the order
            _checkpoints.insert(_checkpoints.begin(), put_back_list.begin(), put_back_list.end());
        } else {
            // need to re-sort
            _checkpoints.insert(_checkpoints.begin(), put_back_list.begin(), put_back_list.end());
            std::sort(_checkpoints.begin(), _checkpoints.end());
        }

        if (!_checkpoints.empty()) {
            min_d = _checkpoints.front();
            max_d = _checkpoints.back();
            checkpoints_count = _checkpoints.size();
        } else {
            min_d = 0;
            max_d = 0;
            checkpoints_count = 0;
        }
    }

    ddebug("%s: after checkpoint garbage collection, checkpoints_count = %d, "
           "min_checkpoint = %" PRId64 ", max_checkpoint = %" PRId64,
460
           replica_name(),
Q
qinzuoyan 已提交
461 462 463 464 465
           checkpoints_count,
           min_d,
           max_d);
}

466 467 468 469
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
                                                   int64_t timestamp,
                                                   dsn_message_t *requests,
                                                   int count)
Q
qinzuoyan 已提交
470 471 472 473 474
{
    dassert(_is_open, "");
    dassert(requests != nullptr, "");
    uint64_t start_time = dsn_now_ns();

475
    _physical_error = 0;
Q
qinzuoyan 已提交
476 477 478
    if (count == 1 &&
        ((::dsn::message_ex *)requests[0])->local_rpc_code ==
            ::dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
479
        _pfc_multi_put_qps->increment();
Q
qinzuoyan 已提交
480 481 482
        dsn_message_t request = requests[0];

        ::dsn::apps::update_response resp;
483 484
        resp.app_id = _gpid.get_app_id();
        resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
485 486 487 488 489 490 491 492 493
        resp.decree = decree;
        resp.server = _primary_address;

        ::dsn::apps::multi_put_request update;
        ::dsn::unmarshall(request, update);

        if (update.kvs.empty()) {
            // invalid argument
            derror("%s: invalid argument for multi_put: decree = %" PRId64 ", error = empty kvs",
494
                   replica_name(),
Q
qinzuoyan 已提交
495 496 497 498 499 500 501 502 503
                   decree);

            ::dsn::rpc_replier<::dsn::apps::update_response> replier(
                dsn_msg_create_response(request));
            if (!replier.is_empty()) {
                // an invalid operation shoundn't be added to latency calculation
                resp.error = rocksdb::Status::kInvalidArgument;
                replier(resp);
            }
504
            return 0;
Q
qinzuoyan 已提交
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
        }

        for (auto &kv : update.kvs) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, update.hash_key, kv.key);
            rocksdb::Slice slice_key(raw_key.data(), raw_key.length());
            rocksdb::SliceParts skey(&slice_key, 1);

            pegasus_generate_value(_value_schema_version,
                                   update.expire_ts_seconds,
                                   kv.value,
                                   _write_buf,
                                   _write_slices);
            rocksdb::SliceParts svalue(&_write_slices[0], _write_slices.size());

            _batch.Put(skey, svalue);
        }

        _wt_opts.given_decree = decree;
        rocksdb::Status status = _db->Write(_wt_opts, &_batch);
        if (!status.ok()) {
            derror("%s: rocksdb write failed for multi_put: decree = %" PRId64 ", error = %s",
527
                   replica_name(),
Q
qinzuoyan 已提交
528 529 530 531 532 533 534
                   decree,
                   status.ToString().c_str());
            _physical_error = status.code();
        }

        ::dsn::rpc_replier<::dsn::apps::update_response> replier(dsn_msg_create_response(request));
        if (!replier.is_empty()) {
535
            _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
536 537 538 539 540
            resp.error = status.code();
            replier(resp);
        }

        _batch.Clear();
541
        return _physical_error;
Q
qinzuoyan 已提交
542 543 544
    } else if (count == 1 &&
               ((::dsn::message_ex *)requests[0])->local_rpc_code ==
                   ::dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
545
        _pfc_multi_remove_qps->increment();
Q
qinzuoyan 已提交
546 547 548
        dsn_message_t request = requests[0];

        ::dsn::apps::multi_remove_response resp;
549 550
        resp.app_id = _gpid.get_app_id();
        resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
551 552 553 554 555 556 557 558 559 560
        resp.decree = decree;
        resp.server = _primary_address;

        ::dsn::apps::multi_remove_request update;
        ::dsn::unmarshall(request, update);

        if (update.sort_keys.empty()) {
            // invalid argument
            derror("%s: invalid argument for multi_remove: decree = %" PRId64
                   ", error = empty sort keys",
561
                   replica_name(),
Q
qinzuoyan 已提交
562 563 564 565 566 567 568 569 570 571
                   decree);

            ::dsn::rpc_replier<::dsn::apps::multi_remove_response> replier(
                dsn_msg_create_response(request));
            if (!replier.is_empty()) {
                // an invalid operation shoundn't be added to latency calculation
                resp.error = rocksdb::Status::kInvalidArgument;
                resp.count = 0;
                replier(resp);
            }
572
            return 0;
Q
qinzuoyan 已提交
573 574 575 576 577 578 579 580 581 582 583 584
        }

        for (auto &sort_key : update.sort_keys) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, update.hash_key, sort_key);
            _batch.Delete(rocksdb::Slice(raw_key.data(), raw_key.length()));
        }

        _wt_opts.given_decree = decree;
        rocksdb::Status status = _db->Write(_wt_opts, &_batch);
        if (!status.ok()) {
            derror("%s: rocksdb write failed for multi_remove: decree = %" PRId64 ", error = %s",
585
                   replica_name(),
Q
qinzuoyan 已提交
586 587 588 589 590 591 592 593 594 595 596
                   decree,
                   status.ToString().c_str());
            _physical_error = status.code();
            resp.count = 0;
        } else {
            resp.count = update.sort_keys.size();
        }

        ::dsn::rpc_replier<::dsn::apps::multi_remove_response> replier(
            dsn_msg_create_response(request));
        if (!replier.is_empty()) {
597
            _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
598 599 600 601 602
            resp.error = status.code();
            replier(resp);
        }

        _batch.Clear();
603
        return _physical_error;
Q
qinzuoyan 已提交
604 605 606 607 608 609 610
    } else {
        for (int i = 0; i < count; ++i) {
            dsn_message_t request = requests[i];
            dassert(request != nullptr, "");
            ::dsn::message_ex *msg = (::dsn::message_ex *)request;
            ::dsn::blob key;
            if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_PUT) {
611
                _pfc_put_qps->increment();
Q
qinzuoyan 已提交
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
                ::dsn::apps::update_request update;
                ::dsn::unmarshall(request, update);
                key = update.key;

                rocksdb::Slice slice_key(key.data(), key.length());
                rocksdb::SliceParts skey(&slice_key, 1);

                pegasus_generate_value(_value_schema_version,
                                       update.expire_ts_seconds,
                                       update.value,
                                       _write_buf,
                                       _write_slices);
                rocksdb::SliceParts svalue(&_write_slices[0], _write_slices.size());

                _batch.Put(skey, svalue);
                _batch_repliers.emplace_back(dsn_msg_create_response(request));
628
                _batch_perfcounters.push_back(_pfc_put_latency.get());
Q
qinzuoyan 已提交
629
            } else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_REMOVE) {
630
                _pfc_remove_qps->increment();
Q
qinzuoyan 已提交
631 632 633 634 635
                ::dsn::unmarshall(request, key);

                rocksdb::Slice skey(key.data(), key.length());
                _batch.Delete(skey);
                _batch_repliers.emplace_back(dsn_msg_create_response(request));
636
                _batch_perfcounters.push_back(_pfc_remove_latency.get());
Q
qinzuoyan 已提交
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
            } else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
                       msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
                dassert(false,
                        "rpc code not allow batch: %s",
                        ::dsn::task_code(msg->local_rpc_code).to_string());
            } else {
                dassert(false,
                        "rpc code not handled: %s",
                        ::dsn::task_code(msg->local_rpc_code).to_string());
            }

            if (msg->header->client.partition_hash != 0) {
                uint64_t partition_hash = pegasus_key_hash(key);
                dassert(msg->header->client.partition_hash == partition_hash,
                        "inconsistent partition hash");
652
                int thread_hash = _gpid.thread_hash();
Q
qinzuoyan 已提交
653 654 655 656 657 658 659
                dassert(msg->header->client.thread_hash == thread_hash, "inconsistent thread hash");
            }

            if (_verbose_log) {
                ::dsn::blob hash_key, sort_key;
                pegasus_restore_key(key, hash_key, sort_key);
                ddebug("%s: rocksdb write: decree = %" PRId64
660
                       ", code = %s, hash_key = \"%s\", sort_key = \"%s\"",
661
                       replica_name(),
Q
qinzuoyan 已提交
662
                       decree,
663
                       msg->local_rpc_code.to_string(),
664 665
                       ::pegasus::utils::c_escape_string(hash_key).c_str(),
                       ::pegasus::utils::c_escape_string(sort_key).c_str());
Q
qinzuoyan 已提交
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
            }
        }
    }

    if (_batch.Count() == 0) {
        // write fake data
        rocksdb::Slice empty_key;
        rocksdb::SliceParts skey(&empty_key, 1);

        ::dsn::blob empty_value;
        pegasus_generate_value(_value_schema_version, 0, empty_value, _write_buf, _write_slices);
        rocksdb::SliceParts svalue(&_write_slices[0], _write_slices.size());

        _batch.Put(skey, svalue);
    }
    _wt_opts.given_decree = decree;
    auto status = _db->Write(_wt_opts, &_batch);
    if (!status.ok()) {
        derror("%s: rocksdb write failed: decree = %" PRId64 ", error = %s",
685
               replica_name(),
Q
qinzuoyan 已提交
686 687 688 689 690 691 692
               decree,
               status.ToString().c_str());
        _physical_error = status.code();
    }

    ::dsn::apps::update_response resp;
    resp.error = status.code();
693 694
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
695 696 697 698 699
    resp.decree = decree;
    resp.server = _primary_address;

    dassert(_batch_repliers.size() == _batch_perfcounters.size(),
            "%s: repliers's size(%u) vs perfcounters's size(%u) not match",
700
            replica_name(),
Q
qinzuoyan 已提交
701 702 703 704 705
            _batch_repliers.size(),
            _batch_perfcounters.size());
    uint64_t latency = dsn_now_ns() - start_time;
    for (unsigned int i = 0; i != _batch_repliers.size(); ++i) {
        if (!_batch_repliers[i].is_empty()) {
706
            _batch_perfcounters[i]->set(latency);
Q
qinzuoyan 已提交
707 708 709 710 711 712 713
            _batch_repliers[i](resp);
        }
    }

    _batch.Clear();
    _batch_repliers.clear();
    _batch_perfcounters.clear();
714 715

    return _physical_error;
Q
qinzuoyan 已提交
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
}

void pegasus_server_impl::on_put(const ::dsn::apps::update_request &update,
                                 ::dsn::rpc_replier<::dsn::apps::update_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_multi_put(const ::dsn::apps::multi_put_request &args,
                                       ::dsn::rpc_replier<::dsn::apps::update_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_remove(const ::dsn::blob &key,
                                    ::dsn::rpc_replier<::dsn::apps::update_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_multi_remove(
    const ::dsn::apps::multi_remove_request &args,
    ::dsn::rpc_replier<::dsn::apps::multi_remove_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_get(const ::dsn::blob &key,
                                 ::dsn::rpc_replier<::dsn::apps::read_response> &reply)
{
    dassert(_is_open, "");
747
    _pfc_get_qps->increment();
Q
qinzuoyan 已提交
748 749 750
    uint64_t start_time = dsn_now_ns();

    ::dsn::apps::read_response resp;
751 752
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
753 754 755 756 757 758 759 760 761
    resp.server = _primary_address;

    rocksdb::Slice skey(key.data(), key.length());
    std::unique_ptr<std::string> value(new std::string());
    rocksdb::Status status = _db->Get(_rd_opts, skey, value.get());

    if (status.ok()) {
        uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, *value);
        if (expire_ts > 0 && expire_ts <= ::pegasus::utils::epoch_now()) {
762
            _pfc_recent_expire_count->increment();
Q
qinzuoyan 已提交
763
            if (_verbose_log) {
764
                derror("%s: rocksdb data expired for get", replica_name());
Q
qinzuoyan 已提交
765 766 767 768 769 770 771 772 773
            }
            status = rocksdb::Status::NotFound();
        }
    }

    if (!status.ok()) {
        if (_verbose_log) {
            ::dsn::blob hash_key, sort_key;
            pegasus_restore_key(key, hash_key, sort_key);
774 775
            derror("%s: rocksdb get failed for get: "
                   "hash_key = \"%s\", sort_key = \"%s\", error = %s",
776
                   replica_name(),
777 778
                   ::pegasus::utils::c_escape_string(hash_key).c_str(),
                   ::pegasus::utils::c_escape_string(sort_key).c_str(),
Q
qinzuoyan 已提交
779 780
                   status.ToString().c_str());
        } else if (!status.IsNotFound()) {
781
            derror("%s: rocksdb get failed for get: error = %s",
782
                   replica_name(),
Q
qinzuoyan 已提交
783 784 785 786
                   status.ToString().c_str());
        }
    }

787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
    if (_abnormal_get_time_threshold_ns || _abnormal_get_size_threshold) {
        uint64_t time_used = dsn_now_ns() - start_time;
        if ((_abnormal_get_time_threshold_ns && time_used >= _abnormal_get_time_threshold_ns) ||
            (_abnormal_get_size_threshold && value->size() >= _abnormal_get_size_threshold)) {
            ::dsn::blob hash_key, sort_key;
            pegasus_restore_key(key, hash_key, sort_key);
            dwarn("%s: rocksdb abnormal get: "
                  "hash_key = \"%s\", sort_key = \"%s\", return = %s, "
                  "value_size = %d, time_used = %" PRIu64 " ns",
                  replica_name(),
                  ::pegasus::utils::c_escape_string(hash_key).c_str(),
                  ::pegasus::utils::c_escape_string(sort_key).c_str(),
                  status.ToString().c_str(),
                  (int)value->size(),
                  time_used);
802
            _pfc_recent_abnormal_count->increment();
803
        }
804 805
    }

Q
qinzuoyan 已提交
806 807 808 809 810
    resp.error = status.code();
    if (status.ok()) {
        pegasus_extract_user_data(_value_schema_version, std::move(value), resp.value);
    }

811
    _pfc_get_latency->set(dsn_now_ns() - start_time);
812

Q
qinzuoyan 已提交
813 814 815 816 817 818 819
    reply(resp);
}

void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &request,
                                       ::dsn::rpc_replier<::dsn::apps::multi_get_response> &reply)
{
    dassert(_is_open, "");
820
    _pfc_multi_get_qps->increment();
Q
qinzuoyan 已提交
821 822 823
    uint64_t start_time = dsn_now_ns();

    ::dsn::apps::multi_get_response resp;
824 825
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
826 827
    resp.server = _primary_address;

828
    if (!is_filter_type_supported(request.sort_key_filter_type)) {
829
        derror("%s: filter type %d not supported for multi_get",
830
               replica_name(),
831 832
               request.sort_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
833
        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
834 835 836 837
        reply(resp);
        return;
    }

Q
qinzuoyan 已提交
838 839 840
    int32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX;
    int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
841 842 843 844 845
    int32_t count = 0;
    int64_t size = 0;
    int32_t iterate_count = 0;
    int32_t expire_count = 0;
    int32_t filter_count = 0;
Q
qinzuoyan 已提交
846 847

    if (request.sort_keys.empty()) {
848 849 850
        ::dsn::blob range_start_key, range_stop_key;
        pegasus_generate_key(range_start_key, request.hash_key, request.start_sortkey);
        bool start_inclusive = request.start_inclusive;
851 852
        bool stop_inclusive;
        if (request.stop_sortkey.length() == 0) {
853
            pegasus_generate_next_blob(range_stop_key, request.hash_key);
854 855
            stop_inclusive = false;
        } else {
856
            pegasus_generate_key(range_stop_key, request.hash_key, request.stop_sortkey);
857 858 859
            stop_inclusive = request.stop_inclusive;
        }

860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
        rocksdb::Slice start(range_start_key.data(), range_start_key.length());
        rocksdb::Slice stop(range_stop_key.data(), range_stop_key.length());

        // limit key range by prefix filter
        ::dsn::blob prefix_start_key, prefix_stop_key;
        if (request.sort_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
            request.sort_key_filter_pattern.length() > 0) {
            pegasus_generate_key(
                prefix_start_key, request.hash_key, request.sort_key_filter_pattern);
            pegasus_generate_next_blob(
                prefix_stop_key, request.hash_key, request.sort_key_filter_pattern);

            rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
            if (prefix_start.compare(start) > 0) {
                start = prefix_start;
                start_inclusive = true;
            }

            rocksdb::Slice prefix_stop(prefix_stop_key.data(), prefix_stop_key.length());
            if (prefix_stop.compare(stop) <= 0) {
                stop = prefix_stop;
                stop_inclusive = false;
            }
        }

        // check if range is empty
        int c = start.compare(stop);
        if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
            // empty sort key range
            if (_verbose_log) {
                dwarn("%s: empty sort key range for multi_get: hash_key = \"%s\", "
                      "start_sort_key = \"%s\" (%s), stop_sort_key = \"%s\" (%s), "
                      "sort_key_filter_type = %s, sort_key_filter_pattern = \"%s\", "
                      "final_start = \"%s\" (%s), final_stop = \"%s\" (%s)",
894
                      replica_name(),
895 896 897 898 899 900 901 902 903 904 905 906 907 908
                      ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
                      ::pegasus::utils::c_escape_string(request.start_sortkey).c_str(),
                      request.start_inclusive ? "inclusive" : "exclusive",
                      ::pegasus::utils::c_escape_string(request.stop_sortkey).c_str(),
                      request.stop_inclusive ? "inclusive" : "exclusive",
                      ::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)
                          ->second,
                      ::pegasus::utils::c_escape_string(request.sort_key_filter_pattern).c_str(),
                      ::pegasus::utils::c_escape_string(start).c_str(),
                      start_inclusive ? "inclusive" : "exclusive",
                      ::pegasus::utils::c_escape_string(stop).c_str(),
                      stop_inclusive ? "inclusive" : "exclusive");
            }
            resp.error = rocksdb::Status::kOk;
909
            _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
910 911 912 913
            reply(resp);
            return;
        }

914 915
        std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
        bool complete = false;
916 917 918 919
        if (!request.reverse) {
            it->Seek(start);
            bool first_exclusive = !start_inclusive;
            while (count < max_kv_count && size < max_kv_size && it->Valid()) {
920 921
                iterate_count++;

922 923 924 925 926 927 928
                // check stop sort key
                int c = it->key().compare(stop);
                if (c > 0 || (c == 0 && !stop_inclusive)) {
                    // out of range
                    complete = true;
                    break;
                }
929

930 931 932 933 934 935 936 937
                // check start sort key
                if (first_exclusive) {
                    first_exclusive = false;
                    if (it->key().compare(start) == 0) {
                        // discard start_sortkey
                        it->Next();
                        continue;
                    }
938 939
                }

940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964
                // extract value
                int r = append_key_value_for_multi_get(resp.kvs,
                                                       it->key(),
                                                       it->value(),
                                                       request.sort_key_filter_type,
                                                       request.sort_key_filter_pattern,
                                                       epoch_now,
                                                       request.no_value);
                if (r == 1) {
                    count++;
                    auto &kv = resp.kvs.back();
                    size += kv.key.length() + kv.value.length();
                } else if (r == 2) {
                    expire_count++;
                } else { // r == 3
                    filter_count++;
                }

                if (c == 0) {
                    // if arrived to the last position
                    complete = true;
                    break;
                }

                it->Next();
Q
qinzuoyan 已提交
965
            }
966 967 968 969 970
        } else { // reverse
            it->SeekForPrev(stop);
            bool first_exclusive = !stop_inclusive;
            std::vector<::dsn::apps::key_value> reverse_kvs;
            while (count < max_kv_count && size < max_kv_size && it->Valid()) {
971 972
                iterate_count++;

973 974 975 976 977 978 979
                // check start sort key
                int c = it->key().compare(start);
                if (c < 0 || (c == 0 && !start_inclusive)) {
                    // out of range
                    complete = true;
                    break;
                }
980

981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
                // check stop sort key
                if (first_exclusive) {
                    first_exclusive = false;
                    if (it->key().compare(stop) == 0) {
                        // discard stop_sortkey
                        it->Prev();
                        continue;
                    }
                }

                // extract value
                int r = append_key_value_for_multi_get(reverse_kvs,
                                                       it->key(),
                                                       it->value(),
                                                       request.sort_key_filter_type,
                                                       request.sort_key_filter_pattern,
                                                       epoch_now,
                                                       request.no_value);
                if (r == 1) {
                    count++;
                    auto &kv = reverse_kvs.back();
                    size += kv.key.length() + kv.value.length();
                } else if (r == 2) {
                    expire_count++;
                } else { // r == 3
                    filter_count++;
                }

                if (c == 0) {
                    // if arrived to the last position
                    complete = true;
                    break;
                }

                it->Prev();
1016 1017
            }

1018 1019 1020 1021 1022 1023 1024
            if (it->status().ok() && !reverse_kvs.empty()) {
                // revert order to make resp.kvs ordered in sort_key
                resp.kvs.reserve(reverse_kvs.size());
                for (int i = reverse_kvs.size() - 1; i >= 0; i--) {
                    resp.kvs.emplace_back(std::move(reverse_kvs[i]));
                }
            }
Q
qinzuoyan 已提交
1025 1026 1027 1028 1029 1030
        }

        resp.error = it->status().code();
        if (!it->status().ok()) {
            // error occur
            if (_verbose_log) {
1031 1032
                derror("%s: rocksdb scan failed for multi_get: hash_key = \"%s\", "
                       "reverse = %s, error = %s",
1033
                       replica_name(),
1034
                       ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
1035
                       request.reverse ? "true" : "false",
Q
qinzuoyan 已提交
1036 1037
                       it->status().ToString().c_str());
            } else {
1038
                derror("%s: rocksdb scan failed for multi_get: reverse = %s, error = %s",
1039
                       replica_name(),
1040
                       request.reverse ? "true" : "false",
Q
qinzuoyan 已提交
1041 1042 1043
                       it->status().ToString().c_str());
            }
            resp.kvs.clear();
1044
        } else if (it->Valid() && !complete) {
Q
qinzuoyan 已提交
1045 1046 1047 1048
            // scan not completed
            resp.error = rocksdb::Status::kIncomplete;
        }
    } else {
1049 1050
        bool error_occurred = false;
        rocksdb::Status final_status;
Q
qinzuoyan 已提交
1051
        bool exceed_limit = false;
1052 1053 1054 1055 1056
        std::vector<::dsn::blob> keys_holder;
        std::vector<rocksdb::Slice> keys;
        std::vector<std::string> values;
        keys_holder.reserve(request.sort_keys.size());
        keys.reserve(request.sort_keys.size());
Q
qinzuoyan 已提交
1057 1058 1059
        for (auto &sort_key : request.sort_keys) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, request.hash_key, sort_key);
1060 1061 1062
            keys.emplace_back(raw_key.data(), raw_key.length());
            keys_holder.emplace_back(std::move(raw_key));
        }
Q
qinzuoyan 已提交
1063

1064 1065 1066 1067
        std::vector<rocksdb::Status> statuses = _db->MultiGet(_rd_opts, keys, &values);
        for (int i = 0; i < keys.size(); i++) {
            rocksdb::Status& status = statuses[i];
            std::string& value = values[i];
Q
qinzuoyan 已提交
1068 1069 1070
            // print log
            if (!status.ok()) {
                if (_verbose_log) {
1071 1072
                    derror("%s: rocksdb get failed for multi_get: "
                           "hash_key = \"%s\", sort_key = \"%s\", error = %s",
1073
                           replica_name(),
1074
                           ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
1075
                           ::pegasus::utils::c_escape_string(request.sort_keys[i]).c_str(),
Q
qinzuoyan 已提交
1076 1077
                           status.ToString().c_str());
                } else if (!status.IsNotFound()) {
1078
                    derror("%s: rocksdb get failed for multi_get: error = %s",
1079
                           replica_name(),
Q
qinzuoyan 已提交
1080 1081 1082
                           status.ToString().c_str());
                }
            }
1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
            // check ttl
            if (status.ok()) {
                uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
                if (expire_ts > 0 && expire_ts <= epoch_now) {
                    expire_count++;
                    if (_verbose_log) {
                        derror("%s: rocksdb data expired for multi_get", replica_name());
                    }
                    status = rocksdb::Status::NotFound();
                }
            }
Q
qinzuoyan 已提交
1094 1095
            // extract value
            if (status.ok()) {
1096 1097 1098 1099 1100
                // check if exceed limit
                if (count >= max_kv_count || size >= max_kv_size) {
                    exceed_limit = true;
                    break;
                }
Q
qinzuoyan 已提交
1101
                ::dsn::apps::key_value kv;
1102
                kv.key = request.sort_keys[i];
Q
qinzuoyan 已提交
1103
                if (!request.no_value) {
1104 1105 1106
                    pegasus_extract_user_data(_value_schema_version,
                                              ::dsn::make_unique<std::string>(std::move(value)),
                                              kv.value);
Q
qinzuoyan 已提交
1107 1108 1109
                }
                count++;
                size += kv.key.length() + kv.value.length();
1110
                resp.kvs.emplace_back(std::move(kv));
Q
qinzuoyan 已提交
1111 1112 1113 1114
            }
            // if error occurred
            if (!status.ok() && !status.IsNotFound()) {
                error_occurred = true;
1115
                final_status = status;
Q
qinzuoyan 已提交
1116 1117 1118 1119 1120
                break;
            }
        }

        if (error_occurred) {
1121
            resp.error = final_status.code();
Q
qinzuoyan 已提交
1122 1123 1124 1125 1126 1127 1128 1129
            resp.kvs.clear();
        } else if (exceed_limit) {
            resp.error = rocksdb::Status::kIncomplete;
        } else {
            resp.error = rocksdb::Status::kOk;
        }
    }

1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
    if (_abnormal_multi_get_time_threshold_ns || _abnormal_multi_get_size_threshold ||
        _abnormal_multi_get_iterate_count_threshold) {
        uint64_t time_used = dsn_now_ns() - start_time;
        if ((_abnormal_multi_get_time_threshold_ns &&
             time_used >= _abnormal_multi_get_time_threshold_ns) ||
            (_abnormal_multi_get_size_threshold &&
             (uint64_t)size >= _abnormal_multi_get_size_threshold) ||
            (_abnormal_multi_get_iterate_count_threshold &&
             (uint64_t)iterate_count >= _abnormal_multi_get_iterate_count_threshold)) {
            dwarn("%s: rocksdb abnormal multi_get: hash_key = \"%s\", "
                  "result_count = %d, result_size = %" PRId64 ", iterate_count = %d, "
                  "expire_count = %d, filter_count = %d, time_used = %" PRIu64 " ns",
                  replica_name(),
                  ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
                  count,
                  size,
                  iterate_count,
                  expire_count,
                  filter_count,
                  time_used);
1150
            _pfc_recent_abnormal_count->increment();
1151 1152 1153
        }
    }

1154
    if (expire_count > 0) {
1155
        _pfc_recent_expire_count->add(expire_count);
1156
    }
1157
    if (filter_count > 0) {
1158
        _pfc_recent_filter_count->add(filter_count);
1159
    }
1160
    _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
1161

Q
qinzuoyan 已提交
1162 1163 1164 1165 1166 1167 1168 1169 1170
    reply(resp);
}

void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key,
                                           ::dsn::rpc_replier<::dsn::apps::count_response> &reply)
{
    dassert(_is_open, "");

    ::dsn::apps::count_response resp;
1171 1172
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
    resp.server = _primary_address;

    // scan
    ::dsn::blob start_key, stop_key;
    pegasus_generate_key(start_key, hash_key, ::dsn::blob());
    pegasus_generate_next_blob(stop_key, hash_key);
    rocksdb::Slice start(start_key.data(), start_key.length());
    rocksdb::Slice stop(stop_key.data(), stop_key.length());
    rocksdb::ReadOptions options = _rd_opts;
    options.iterate_upper_bound = &stop;
    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(options));
    it->Seek(start);
    resp.count = 0;
1186 1187
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
    uint64_t expire_count = 0;
Q
qinzuoyan 已提交
1188
    while (it->Valid()) {
1189 1190 1191 1192
        uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, it->value());
        if (expire_ts > 0 && expire_ts <= epoch_now) {
            expire_count++;
            if (_verbose_log) {
1193
                derror("%s: rocksdb data expired for sortkey_count", replica_name());
1194 1195 1196 1197
            }
        } else {
            resp.count++;
        }
Q
qinzuoyan 已提交
1198 1199
        it->Next();
    }
1200
    if (expire_count > 0) {
1201
        _pfc_recent_expire_count->add(expire_count);
1202
    }
Q
qinzuoyan 已提交
1203 1204 1205 1206 1207

    resp.error = it->status().code();
    if (!it->status().ok()) {
        // error occur
        if (_verbose_log) {
1208
            derror("%s: rocksdb scan failed for sortkey_count: hash_key = \"%s\", error = %s",
1209
                   replica_name(),
1210
                   ::pegasus::utils::c_escape_string(hash_key).c_str(),
Q
qinzuoyan 已提交
1211 1212 1213
                   it->status().ToString().c_str());
        } else {
            derror("%s: rocksdb scan failed for sortkey_count: error = %s",
1214
                   replica_name(),
Q
qinzuoyan 已提交
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
                   it->status().ToString().c_str());
        }
        resp.count = 0;
    }

    reply(resp);
}

void pegasus_server_impl::on_ttl(const ::dsn::blob &key,
                                 ::dsn::rpc_replier<::dsn::apps::ttl_response> &reply)
{
    dassert(_is_open, "");

    ::dsn::apps::ttl_response resp;
1229 1230
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
    resp.server = _primary_address;

    rocksdb::Slice skey(key.data(), key.length());
    std::string value;
    rocksdb::Status status = _db->Get(_rd_opts, skey, &value);

    uint32_t expire_ts;
    uint32_t now_ts = ::pegasus::utils::epoch_now();
    if (status.ok()) {
        expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
        if (expire_ts > 0 && expire_ts <= now_ts) {
1242
            _pfc_recent_expire_count->increment();
Q
qinzuoyan 已提交
1243
            if (_verbose_log) {
1244
                derror("%s: rocksdb data expired for ttl", replica_name());
Q
qinzuoyan 已提交
1245 1246 1247 1248 1249 1250 1251 1252 1253
            }
            status = rocksdb::Status::NotFound();
        }
    }

    if (!status.ok()) {
        if (_verbose_log) {
            ::dsn::blob hash_key, sort_key;
            pegasus_restore_key(key, hash_key, sort_key);
1254 1255
            derror("%s: rocksdb get failed for ttl: "
                   "hash_key = \"%s\", sort_key = \"%s\", error = %s",
1256
                   replica_name(),
1257 1258
                   ::pegasus::utils::c_escape_string(hash_key).c_str(),
                   ::pegasus::utils::c_escape_string(sort_key).c_str(),
Q
qinzuoyan 已提交
1259 1260
                   status.ToString().c_str());
        } else if (!status.IsNotFound()) {
1261
            derror("%s: rocksdb get failed for ttl: error = %s",
1262
                   replica_name(),
Q
qinzuoyan 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
                   status.ToString().c_str());
        }
    }

    resp.error = status.code();
    if (status.ok()) {
        if (expire_ts > 0) {
            resp.ttl_seconds = expire_ts - now_ts;
        } else {
            // no ttl
            resp.ttl_seconds = -1;
        }
    }

    reply(resp);
}

DEFINE_TASK_CODE(LOCAL_PEGASUS_SERVER_DELAY, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
1281
void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request &request,
Q
qinzuoyan 已提交
1282 1283 1284
                                         ::dsn::rpc_replier<::dsn::apps::scan_response> &reply)
{
    dassert(_is_open, "");
1285
    _pfc_scan_qps->increment();
1286
    uint64_t start_time = dsn_now_ns();
Q
qinzuoyan 已提交
1287 1288

    ::dsn::apps::scan_response resp;
1289 1290
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1291 1292
    resp.server = _primary_address;

1293
    if (!is_filter_type_supported(request.hash_key_filter_type)) {
1294
        derror("%s: filter type %d not supported for get_scanner",
1295
               replica_name(),
1296 1297 1298 1299 1300 1301
               request.hash_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        reply(resp);
        return;
    }
    if (!is_filter_type_supported(request.sort_key_filter_type)) {
1302
        derror("%s: filter type %d not supported for get_scanner",
1303
               replica_name(),
1304 1305 1306 1307 1308 1309
               request.sort_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        reply(resp);
        return;
    }

1310 1311
    bool start_inclusive = request.start_inclusive;
    bool stop_inclusive = request.stop_inclusive;
1312 1313
    rocksdb::Slice start(request.start_key.data(), request.start_key.length());
    rocksdb::Slice stop(request.stop_key.data(), request.stop_key.length());
1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335

    // limit key range by prefix filter
    // because data is not ordered by hash key (hash key "aa" is greater than "b"),
    // so we can only limit the start range by hash key filter.
    ::dsn::blob prefix_start_key;
    if (request.hash_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
        request.hash_key_filter_pattern.length() > 0) {
        pegasus_generate_key(prefix_start_key, request.hash_key_filter_pattern, ::dsn::blob());
        rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
        if (prefix_start.compare(start) > 0) {
            start = prefix_start;
            start_inclusive = true;
        }
    }

    // check if range is empty
    int c = start.compare(stop);
    if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
        // empty key range
        if (_verbose_log) {
            dwarn("%s: empty key range for get_scanner: "
                  "start_key = \"%s\" (%s), stop_key = \"%s\" (%s)",
1336
                  replica_name(),
1337 1338 1339 1340 1341 1342
                  ::pegasus::utils::c_escape_string(request.start_key).c_str(),
                  request.start_inclusive ? "inclusive" : "exclusive",
                  ::pegasus::utils::c_escape_string(request.stop_key).c_str(),
                  request.stop_inclusive ? "inclusive" : "exclusive");
        }
        resp.error = rocksdb::Status::kOk;
1343
        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
1344 1345 1346
        reply(resp);
        return;
    }
Q
qinzuoyan 已提交
1347

1348
    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
Q
qinzuoyan 已提交
1349 1350
    it->Seek(start);
    bool complete = false;
1351
    bool first_exclusive = !start_inclusive;
Q
qinzuoyan 已提交
1352
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
1353
    uint64_t expire_count = 0;
1354 1355
    uint64_t filter_count = 0;
    int32_t count = 0;
1356
    resp.kvs.reserve(request.batch_size);
1357 1358
    while (count < request.batch_size && it->Valid()) {
        int c = it->key().compare(stop);
1359
        if (c > 0 || (c == 0 && !stop_inclusive)) {
1360
            // out of range
Q
qinzuoyan 已提交
1361 1362
            complete = true;
            break;
1363 1364 1365 1366 1367 1368 1369
        }

        if (first_exclusive) {
            first_exclusive = false;
            if (it->key().compare(start) == 0) {
                // discard start_sortkey
                it->Next();
Q
qinzuoyan 已提交
1370 1371 1372 1373
                continue;
            }
        }

1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
        int r = append_key_value_for_scan(resp.kvs,
                                          it->key(),
                                          it->value(),
                                          request.hash_key_filter_type,
                                          request.hash_key_filter_pattern,
                                          request.sort_key_filter_type,
                                          request.sort_key_filter_pattern,
                                          epoch_now,
                                          request.no_value);
        if (r == 1) {
            count++;
        } else if (r == 2) {
1386
            expire_count++;
1387 1388
        } else { // r == 3
            filter_count++;
1389
        }
1390 1391 1392 1393 1394 1395 1396 1397

        if (c == 0) {
            // seek to the last position
            complete = true;
            break;
        }

        it->Next();
1398
    }
Q
qinzuoyan 已提交
1399

1400 1401 1402 1403
    resp.error = it->status().code();
    if (!it->status().ok()) {
        // error occur
        if (_verbose_log) {
1404 1405
            derror("%s: rocksdb scan failed for get_scanner: "
                   "start_key = \"%s\" (%s), stop_key = \"%s\" (%s), "
1406
                   "batch_size = %d, read_count = %d, error = %s",
1407
                   replica_name(),
1408
                   ::pegasus::utils::c_escape_string(start).c_str(),
1409
                   request.start_inclusive ? "inclusive" : "exclusive",
1410
                   ::pegasus::utils::c_escape_string(stop).c_str(),
1411 1412 1413 1414 1415
                   request.stop_inclusive ? "inclusive" : "exclusive",
                   request.batch_size,
                   count,
                   it->status().ToString().c_str());
        } else {
1416
            derror("%s: rocksdb scan failed for get_scanner: error = %s",
1417
                   replica_name(),
1418 1419 1420 1421 1422
                   it->status().ToString().c_str());
        }
        resp.kvs.clear();
    } else if (it->Valid() && !complete) {
        // scan not completed
Q
qinzuoyan 已提交
1423 1424 1425
        std::unique_ptr<pegasus_scan_context> context(
            new pegasus_scan_context(std::move(it),
                                     std::string(stop.data(), stop.size()),
1426 1427 1428 1429 1430 1431 1432 1433 1434
                                     request.stop_inclusive,
                                     request.hash_key_filter_type,
                                     std::string(request.hash_key_filter_pattern.data(),
                                                 request.hash_key_filter_pattern.length()),
                                     request.sort_key_filter_type,
                                     std::string(request.sort_key_filter_pattern.data(),
                                                 request.sort_key_filter_pattern.length()),
                                     request.batch_size,
                                     request.no_value));
Q
qinzuoyan 已提交
1435 1436
        int64_t handle = _context_cache.put(std::move(context));
        resp.context_id = handle;
1437 1438
        // if the context is used, it will be fetched and re-put into cache,
        // which will change the handle,
Q
qinzuoyan 已提交
1439 1440 1441 1442 1443 1444
        // then the delayed task will fetch null context by old handle, and do nothing.
        ::dsn::tasking::enqueue(LOCAL_PEGASUS_SERVER_DELAY,
                                this,
                                [this, handle]() { _context_cache.fetch(handle); },
                                0,
                                std::chrono::minutes(5));
1445 1446 1447
    } else {
        // scan completed
        resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
Q
qinzuoyan 已提交
1448 1449
    }

1450
    if (expire_count > 0) {
1451
        _pfc_recent_expire_count->add(expire_count);
1452 1453
    }
    if (filter_count > 0) {
1454
        _pfc_recent_filter_count->add(filter_count);
1455 1456
    }

1457
    _pfc_scan_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
1458 1459 1460
    reply(resp);
}

1461
void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
Q
qinzuoyan 已提交
1462 1463 1464
                                  ::dsn::rpc_replier<::dsn::apps::scan_response> &reply)
{
    dassert(_is_open, "");
1465
    _pfc_scan_qps->increment();
Q
qinzuoyan 已提交
1466 1467 1468
    uint64_t start_time = dsn_now_ns();

    ::dsn::apps::scan_response resp;
1469 1470
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1471 1472
    resp.server = _primary_address;

1473
    std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
Q
qinzuoyan 已提交
1474 1475 1476 1477
    if (context) {
        rocksdb::Iterator *it = context->iterator.get();
        int32_t batch_size = context->batch_size;
        const rocksdb::Slice &stop = context->stop;
1478 1479 1480 1481 1482 1483
        bool stop_inclusive = context->stop_inclusive;
        ::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
        const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
        ::dsn::apps::filter_type::type sort_key_filter_type = context->hash_key_filter_type;
        const ::dsn::blob &sort_key_filter_pattern = context->hash_key_filter_pattern;
        bool no_value = context->no_value;
Q
qinzuoyan 已提交
1484 1485
        bool complete = false;
        uint32_t epoch_now = ::pegasus::utils::epoch_now();
1486
        uint64_t expire_count = 0;
1487 1488 1489 1490 1491 1492 1493
        uint64_t filter_count = 0;
        int32_t count = 0;

        while (count < batch_size && it->Valid()) {
            int c = it->key().compare(stop);
            if (c > 0 || (c == 0 && !stop_inclusive)) {
                // out of range
Q
qinzuoyan 已提交
1494 1495 1496
                complete = true;
                break;
            }
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509

            int r = append_key_value_for_scan(resp.kvs,
                                              it->key(),
                                              it->value(),
                                              hash_key_filter_type,
                                              hash_key_filter_pattern,
                                              sort_key_filter_type,
                                              sort_key_filter_pattern,
                                              epoch_now,
                                              no_value);
            if (r == 1) {
                count++;
            } else if (r == 2) {
1510
                expire_count++;
1511 1512
            } else { // r == 3
                filter_count++;
1513
            }
1514 1515 1516 1517 1518 1519 1520 1521

            if (c == 0) {
                // seek to the last position
                complete = true;
                break;
            }

            it->Next();
1522
        }
1523 1524 1525 1526 1527

        resp.error = it->status().code();
        if (!it->status().ok()) {
            // error occur
            if (_verbose_log) {
1528 1529
                derror("%s: rocksdb scan failed for scan: "
                       "context_id= %" PRId64 ", stop_key = \"%s\" (%s), "
1530
                       "batch_size = %d, read_count = %d, error = %s",
1531
                       replica_name(),
1532
                       request.context_id,
1533 1534
                       ::pegasus::utils::c_escape_string(stop).c_str(),
                       stop_inclusive ? "inclusive" : "exclusive",
1535 1536 1537 1538
                       batch_size,
                       count,
                       it->status().ToString().c_str());
            } else {
1539
                derror("%s: rocksdb scan failed for scan: error = %s",
1540
                       replica_name(),
1541 1542 1543 1544 1545
                       it->status().ToString().c_str());
            }
            resp.kvs.clear();
        } else if (it->Valid() && !complete) {
            // scan not completed
Q
qinzuoyan 已提交
1546 1547 1548 1549 1550 1551 1552
            int64_t handle = _context_cache.put(std::move(context));
            resp.context_id = handle;
            ::dsn::tasking::enqueue(LOCAL_PEGASUS_SERVER_DELAY,
                                    this,
                                    [this, handle]() { _context_cache.fetch(handle); },
                                    0,
                                    std::chrono::minutes(5));
1553 1554 1555 1556 1557 1558
        } else {
            // scan completed
            resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
        }

        if (expire_count > 0) {
1559
            _pfc_recent_expire_count->add(expire_count);
1560 1561
        }
        if (filter_count > 0) {
1562
            _pfc_recent_filter_count->add(filter_count);
Q
qinzuoyan 已提交
1563 1564 1565 1566 1567
        }
    } else {
        resp.error = rocksdb::Status::Code::kNotFound;
    }

1568
    _pfc_scan_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
    reply(resp);
}

void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }

DEFINE_TASK_CODE(UPDATING_ROCKSDB_SSTSIZE, TASK_PRIORITY_COMMON, THREAD_POOL_REPLICATION_LONG)

::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
    dassert(!_is_open, "");
1579
    ddebug("%s: start to open app %s", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
1580 1581 1582 1583 1584 1585 1586

    rocksdb::Options opts = _db_opts;
    opts.create_if_missing = true;
    opts.error_if_exists = false;
    opts.compaction_filter = &_key_ttl_compaction_filter;
    opts.default_value_schema_version = PEGASUS_VALUE_SCHEMA_MAX_VERSION;

1587
    auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
C
cailiuyang 已提交
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603

    //
    // here, we must distinguish three cases, such as:
    //  case 1: we open the db that already exist
    //  case 2: we open a new db
    //  case 3: we restore the db base on old data
    //
    // if we want to restore the db base on old data, only all of the restore preconditions are
    // satisfied
    //      restore preconditions:
    //          1, rdb isn't exist
    //          2, we can parse restore info from app env, which is stored in argv
    //          3, restore_dir is exist
    //
    if (::dsn::utils::filesystem::path_exists(path)) {
        // only case 1
1604
        ddebug("%s: rdb is already exist, path = %s", replica_name(), path.c_str());
C
cailiuyang 已提交
1605 1606 1607 1608 1609 1610
    } else {
        std::pair<std::string, bool> restore_info = get_restore_dir_from_env(argc, argv);
        const std::string &restore_dir = restore_info.first;
        bool force_restore = restore_info.second;
        if (restore_dir.empty()) {
            // case 2
1611 1612
            if (force_restore) {
                derror("%s: try to restore, but we can't combine restore_dir from envs",
1613
                       replica_name());
1614 1615
                return ::dsn::ERR_FILE_OPERATION_FAILED;
            } else {
1616
                dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
1617
            }
C
cailiuyang 已提交
1618 1619
        } else {
            // case 3
1620
            ddebug("%s: try to restore from restore_dir = %s", replica_name(), restore_dir.c_str());
C
cailiuyang 已提交
1621 1622 1623 1624
            if (::dsn::utils::filesystem::directory_exists(restore_dir)) {
                // here, we just rename restore_dir to rdb, then continue the normal process
                if (::dsn::utils::filesystem::rename_path(restore_dir.c_str(), path.c_str())) {
                    ddebug("%s: rename restore_dir(%s) to rdb(%s) succeed",
1625
                           replica_name(),
C
cailiuyang 已提交
1626 1627 1628 1629
                           restore_dir.c_str(),
                           path.c_str());
                } else {
                    derror("%s: rename restore_dir(%s) to rdb(%s) failed",
1630
                           replica_name(),
C
cailiuyang 已提交
1631 1632 1633 1634 1635 1636 1637
                           restore_dir.c_str(),
                           path.c_str());
                    return ::dsn::ERR_FILE_OPERATION_FAILED;
                }
            } else {
                if (force_restore) {
                    derror("%s: try to restore, but restore_dir isn't exist, restore_dir = %s",
1638
                           replica_name(),
C
cailiuyang 已提交
1639 1640 1641 1642 1643 1644 1645
                           restore_dir.c_str());
                    return ::dsn::ERR_FILE_OPERATION_FAILED;
                } else {
                    dwarn(
                        "%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
                        "it, the role of this replica must not primary, so we open a new db on the "
                        "path(%s)",
1646
                        replica_name(),
C
cailiuyang 已提交
1647 1648 1649 1650 1651 1652 1653
                        restore_dir.c_str(),
                        path.c_str());
                }
            }
        }
    }

1654
    ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
C
cailiuyang 已提交
1655

Q
qinzuoyan 已提交
1656 1657 1658 1659 1660
    auto status = rocksdb::DB::Open(opts, path, &_db);
    if (status.ok()) {
        _value_schema_version = _db->GetValueSchemaVersion();
        if (_value_schema_version > PEGASUS_VALUE_SCHEMA_MAX_VERSION) {
            derror("%s: open app failed, unsupported value schema version %" PRIu32,
1661
                   replica_name(),
Q
qinzuoyan 已提交
1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677
                   _value_schema_version);
            delete _db;
            _db = nullptr;
            return ::dsn::ERR_LOCAL_APP_FAILURE;
        }

        // only enable filter after correct value_schema_version set
        _key_ttl_compaction_filter.SetValueSchemaVersion(_value_schema_version);
        _key_ttl_compaction_filter.EnableFilter();

        parse_checkpoints();

        int64_t ci = _db->GetLastFlushedDecree();
        if (ci != last_durable_decree()) {
            ddebug("%s: start to do async checkpoint, last_durable_decree = %" PRId64
                   ", last_flushed_decree = %" PRId64,
1678
                   replica_name(),
Q
qinzuoyan 已提交
1679 1680
                   last_durable_decree(),
                   ci);
1681
            auto err = async_checkpoint(false);
Q
qinzuoyan 已提交
1682
            if (err != ::dsn::ERR_OK) {
1683
                dwarn("%s: create checkpoint failed, error = %s, retry again",
1684 1685
                      replica_name(),
                      err.to_string());
1686 1687 1688
                err = async_checkpoint(false);
                if (err != ::dsn::ERR_OK) {
                    derror("%s: create checkpoint failed, error = %s",
1689 1690
                           replica_name(),
                           err.to_string());
1691 1692 1693 1694
                    delete _db;
                    _db = nullptr;
                    return err;
                }
Q
qinzuoyan 已提交
1695 1696 1697 1698 1699 1700 1701 1702 1703
            }
            dassert(ci == last_durable_decree(),
                    "last durable decree mismatch after checkpoint: %" PRId64 " vs %" PRId64,
                    ci,
                    last_durable_decree());
        }

        ddebug("%s: open app succeed, value_schema_version = %" PRIu32
               ", last_durable_decree = %" PRId64 "",
1704
               replica_name(),
Q
qinzuoyan 已提交
1705 1706 1707 1708 1709
               _value_schema_version,
               last_durable_decree());

        _is_open = true;

1710
        dinfo("%s: start the updating sstsize timer task", replica_name());
Q
qinzuoyan 已提交
1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721
        // using ::dsn::timer_task to updating the rocksdb sstsize.
        _updating_task = ::dsn::tasking::enqueue_timer(
            UPDATING_ROCKSDB_SSTSIZE,
            this,
            [this]() { this->updating_rocksdb_sstsize(); },
            std::chrono::seconds(_updating_rocksdb_sstsize_interval_seconds),
            0,
            std::chrono::seconds(30));

        return ::dsn::ERR_OK;
    } else {
1722
        derror("%s: open app failed, error = %s", replica_name(), status.ToString().c_str());
Q
qinzuoyan 已提交
1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }
}

::dsn::error_code pegasus_server_impl::stop(bool clear_state)
{
    if (!_is_open) {
        dassert(_db == nullptr, "");
        dassert(!clear_state, "should not be here if do clear");
        return ::dsn::ERR_OK;
    }

1735 1736 1737 1738
    if (!clear_state) {
        rocksdb::FlushOptions options;
        options.wait = true;
        auto status = _db->Flush(options);
1739 1740
        if (!status.ok() && !status.IsNoNeedOperate()) {
            derror("%s: flush memtable on close failed: %s",
1741 1742
                   replica_name(),
                   status.ToString().c_str());
1743 1744 1745
        }
    }

Q
qinzuoyan 已提交
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
    _context_cache.clear();

    // when stop the, should stop the timer_task.
    if (_updating_task != nullptr)
        _updating_task->cancel(true);

    _is_open = false;
    delete _db;
    _db = nullptr;

    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        _checkpoints.clear();
        set_last_durable_decree(0);
    }

    if (clear_state) {
1763 1764 1765
        if (!::dsn::utils::filesystem::remove_path(data_dir())) {
            derror(
                "%s: clear directory %s failed when stop app", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
1766 1767
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
1768 1769
        _pfc_sst_count->set(0);
        _pfc_sst_size->set(0);
Q
qinzuoyan 已提交
1770 1771
    }

1772 1773
    ddebug(
        "%s: close app succeed, clear_state = %s", replica_name(), clear_state ? "true" : "false");
Q
qinzuoyan 已提交
1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795
    return ::dsn::ERR_OK;
}

class CheckpointingTokenHelper
{
public:
    CheckpointingTokenHelper(std::atomic_bool &flag) : _flag(flag)
    {
        bool expected = false;
        _token_got = _flag.compare_exchange_strong(expected, true);
    }
    ~CheckpointingTokenHelper()
    {
        if (_token_got)
            _flag.store(false);
    }
    bool token_got() const { return _token_got; }
private:
    std::atomic_bool &_flag;
    bool _token_got;
};

1796
::dsn::error_code pegasus_server_impl::sync_checkpoint()
Q
qinzuoyan 已提交
1797 1798 1799 1800 1801
{
    CheckpointingTokenHelper token_helper(_is_checkpointing);
    if (!token_helper.token_got())
        return ::dsn::ERR_WRONG_TIMING;

1802
    int64_t last_commit = last_committed_decree();
Q
qinzuoyan 已提交
1803 1804 1805 1806 1807 1808 1809
    if (last_durable_decree() == last_commit)
        return ::dsn::ERR_NO_NEED_OPERATE;

    rocksdb::Checkpoint *chkpt = nullptr;
    auto status = rocksdb::Checkpoint::Create(_db, &chkpt);
    if (!status.ok()) {
        derror("%s: create Checkpoint object failed, error = %s",
1810
               replica_name(),
Q
qinzuoyan 已提交
1811 1812 1813 1814 1815
               status.ToString().c_str());
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    auto dir = chkpt_get_dir_name(last_commit);
1816
    auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), dir);
Q
qinzuoyan 已提交
1817 1818
    if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
        ddebug("%s: checkpoint directory %s already exist, remove it first",
1819
               replica_name(),
Q
qinzuoyan 已提交
1820 1821
               chkpt_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
1822 1823
            derror(
                "%s: remove old checkpoint directory %s failed", replica_name(), chkpt_dir.c_str());
Q
qinzuoyan 已提交
1824 1825 1826 1827 1828 1829 1830 1831 1832 1833
            delete chkpt;
            chkpt = nullptr;
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }

    status = chkpt->CreateCheckpoint(chkpt_dir);
    if (!status.ok()) {
        // sometimes checkpoint may fail, and try again will succeed
        derror("%s: create checkpoint failed, error = %s, try again",
1834
               replica_name(),
Q
qinzuoyan 已提交
1835 1836 1837 1838 1839 1840 1841 1842 1843
               status.ToString().c_str());
        status = chkpt->CreateCheckpoint(chkpt_dir);
    }

    // destroy Checkpoint object
    delete chkpt;
    chkpt = nullptr;

    if (!status.ok()) {
1844 1845
        derror(
            "%s: create checkpoint failed, error = %s", replica_name(), status.ToString().c_str());
Q
qinzuoyan 已提交
1846 1847 1848
        ::dsn::utils::filesystem::remove_path(chkpt_dir);
        if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
            derror("%s: remove damaged checkpoint directory %s failed",
1849
                   replica_name(),
Q
qinzuoyan 已提交
1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
                   chkpt_dir.c_str());
        }
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        dassert(last_commit > last_durable_decree(),
                "%" PRId64 " VS %" PRId64 "",
                last_commit,
                last_durable_decree());
        if (!_checkpoints.empty()) {
            dassert(last_commit > _checkpoints.back(),
                    "%" PRId64 " VS %" PRId64 "",
                    last_commit,
                    _checkpoints.back());
        }
        _checkpoints.push_back(last_commit);
        set_last_durable_decree(_checkpoints.back());
    }

    ddebug("%s: sync create checkpoint succeed, last_durable_decree = %" PRId64 "",
1872
           replica_name(),
Q
qinzuoyan 已提交
1873 1874 1875 1876 1877 1878 1879 1880
           last_durable_decree());

    gc_checkpoints();

    return ::dsn::ERR_OK;
}

// Must be thread safe.
1881
::dsn::error_code pegasus_server_impl::async_checkpoint(bool is_emergency)
Q
qinzuoyan 已提交
1882 1883 1884 1885 1886
{
    CheckpointingTokenHelper token_helper(_is_checkpointing);
    if (!token_helper.token_got())
        return ::dsn::ERR_WRONG_TIMING;

1887 1888 1889 1890
    // last_durable_decree must less than or equal to _db->GetLastFlushedDecree()
    // if last_durable_decree == _db->GetLastFlushedDecree(), we should flush it first
    int64_t last_flushed_decree = static_cast<int64_t>(_db->GetLastFlushedDecree());
    if (last_durable_decree() == last_flushed_decree) {
Q
qinzuoyan 已提交
1891 1892 1893 1894 1895 1896
        if (is_emergency) {
            // trigger flushing memtable, but not wait
            rocksdb::FlushOptions options;
            options.wait = false;
            auto status = _db->Flush(options);
            if (status.ok()) {
1897
                ddebug("%s: trigger flushing memtable succeed", replica_name());
Q
qinzuoyan 已提交
1898 1899
                return ::dsn::ERR_TRY_AGAIN;
            } else if (status.IsNoNeedOperate()) {
1900
                dwarn("%s: trigger flushing memtable failed, no memtable to flush", replica_name());
Q
qinzuoyan 已提交
1901 1902 1903
                return ::dsn::ERR_NO_NEED_OPERATE;
            } else {
                derror("%s: trigger flushing memtable failed, error = %s",
1904
                       replica_name(),
Q
qinzuoyan 已提交
1905 1906 1907 1908 1909 1910 1911 1912
                       status.ToString().c_str());
                return ::dsn::ERR_LOCAL_APP_FAILURE;
            }
        } else {
            return ::dsn::ERR_NO_NEED_OPERATE;
        }
    }

1913 1914 1915 1916
    dassert(last_durable_decree() < last_flushed_decree,
            "%" PRId64 " VS %" PRId64 "",
            last_durable_decree(),
            last_flushed_decree);
Q
qinzuoyan 已提交
1917 1918 1919

    char buf[256];
    sprintf(buf, "checkpoint.tmp.%" PRIu64 "", dsn_now_us());
1920
    std::string tmp_dir = ::dsn::utils::filesystem::path_combine(data_dir(), buf);
Q
qinzuoyan 已提交
1921 1922
    if (::dsn::utils::filesystem::directory_exists(tmp_dir)) {
        ddebug("%s: temporary checkpoint directory %s already exist, remove it first",
1923
               replica_name(),
Q
qinzuoyan 已提交
1924 1925 1926
               tmp_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
            derror("%s: remove temporary checkpoint directory %s failed",
1927
                   replica_name(),
Q
qinzuoyan 已提交
1928 1929 1930 1931
                   tmp_dir.c_str());
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }
1932

1933 1934 1935 1936
    int64_t checkpoint_decree = 0;
    ::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree);
    if (err != ::dsn::ERR_OK) {
        derror("%s: call copy_checkpoint_to_dir_unsafe failed with err = %s",
1937
               replica_name(),
1938 1939
               err.to_string());
        return ::dsn::ERR_LOCAL_APP_FAILURE;
Q
qinzuoyan 已提交
1940 1941
    }

1942
    auto chkpt_dir =
1943
        ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(checkpoint_decree));
Q
qinzuoyan 已提交
1944 1945
    if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
        ddebug("%s: checkpoint directory %s already exist, remove it first",
1946
               replica_name(),
Q
qinzuoyan 已提交
1947 1948
               chkpt_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
1949 1950
            derror(
                "%s: remove old checkpoint directory %s failed", replica_name(), chkpt_dir.c_str());
Q
qinzuoyan 已提交
1951 1952
            if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
                derror("%s: remove temporary checkpoint directory %s failed",
1953
                       replica_name(),
Q
qinzuoyan 已提交
1954 1955 1956 1957 1958 1959 1960 1961
                       tmp_dir.c_str());
            }
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }

    if (!::dsn::utils::filesystem::rename_path(tmp_dir, chkpt_dir)) {
        derror("%s: rename checkpoint directory from %s to %s failed",
1962
               replica_name(),
Q
qinzuoyan 已提交
1963 1964 1965 1966
               tmp_dir.c_str(),
               chkpt_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
            derror("%s: remove temporary checkpoint directory %s failed",
1967
                   replica_name(),
Q
qinzuoyan 已提交
1968 1969 1970 1971 1972 1973 1974
                   tmp_dir.c_str());
        }
        return ::dsn::ERR_FILE_OPERATION_FAILED;
    }

    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
1975 1976 1977 1978
        dassert(checkpoint_decree > last_durable_decree(),
                "%" PRId64 " VS %" PRId64 "",
                checkpoint_decree,
                last_durable_decree());
Q
qinzuoyan 已提交
1979
        if (!_checkpoints.empty()) {
1980 1981 1982 1983
            dassert(checkpoint_decree > _checkpoints.back(),
                    "%" PRId64 " VS %" PRId64 "",
                    checkpoint_decree,
                    _checkpoints.back());
Q
qinzuoyan 已提交
1984
        }
1985
        _checkpoints.push_back(checkpoint_decree);
Q
qinzuoyan 已提交
1986 1987 1988 1989
        set_last_durable_decree(_checkpoints.back());
    }

    ddebug("%s: async create checkpoint succeed, last_durable_decree = %" PRId64 "",
1990
           replica_name(),
Q
qinzuoyan 已提交
1991 1992 1993 1994 1995 1996 1997
           last_durable_decree());

    gc_checkpoints();

    return ::dsn::ERR_OK;
}

1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019
// Must be thread safe.
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir,
                                                              /*output*/ int64_t *last_decree)
{
    CheckpointingTokenHelper token_helper(_is_checkpointing);
    if (!token_helper.token_got()) {
        return ::dsn::ERR_WRONG_TIMING;
    }

    return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree);
}

// not thread safe, should be protected by caller
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
                                                                     int64_t *checkpoint_decree)
{
    rocksdb::Checkpoint *chkpt = nullptr;
    rocksdb::Status status = rocksdb::Checkpoint::Create(_db, &chkpt);
    if (!status.ok()) {
        if (chkpt != nullptr)
            delete chkpt, chkpt = nullptr;
        derror("%s: create Checkpoint object failed, error = %s",
2020
               replica_name(),
2021 2022 2023 2024 2025 2026
               status.ToString().c_str());
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
        ddebug("%s: checkpoint directory %s is already exist, remove it first",
2027
               replica_name(),
2028 2029
               checkpoint_dir);
        if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
2030
            derror("%s: remove checkpoint directory %s failed", replica_name(), checkpoint_dir);
2031 2032 2033 2034 2035 2036 2037 2038 2039 2040
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }

    uint64_t ci = 0;
    status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci);
    delete chkpt, chkpt = nullptr;

    if (!status.ok()) {
        derror("%s: async create checkpoint failed, error = %s",
2041
               replica_name(),
2042 2043
               status.ToString().c_str());
        if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
2044
            derror("%s: remove checkpoint directory %s failed", replica_name(), checkpoint_dir);
2045 2046 2047 2048 2049
        }
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    ddebug("%s: copy checkpoint to dir(%s) succeed, last_decree = %" PRId64 "",
2050
           replica_name(),
2051 2052 2053 2054 2055 2056 2057 2058 2059
           checkpoint_dir,
           ci);
    if (checkpoint_decree != nullptr) {
        *checkpoint_decree = static_cast<int64_t>(ci);
    }

    return ::dsn::ERR_OK;
}

Q
qinzuoyan 已提交
2060
::dsn::error_code pegasus_server_impl::get_checkpoint(int64_t learn_start,
2061 2062
                                                      const dsn::blob &learn_request,
                                                      dsn::replication::learn_state &state)
Q
qinzuoyan 已提交
2063 2064 2065 2066 2067
{
    dassert(_is_open, "");

    int64_t ci = last_durable_decree();
    if (ci == 0) {
2068
        derror("%s: no checkpoint found", replica_name());
Q
qinzuoyan 已提交
2069 2070 2071
        return ::dsn::ERR_OBJECT_NOT_FOUND;
    }

2072
    auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(ci));
Q
qinzuoyan 已提交
2073 2074
    state.files.clear();
    if (!::dsn::utils::filesystem::get_subfiles(chkpt_dir, state.files, true)) {
2075
        derror("%s: list files in checkpoint dir %s failed", replica_name(), chkpt_dir.c_str());
Q
qinzuoyan 已提交
2076 2077 2078 2079 2080 2081 2082
        return ::dsn::ERR_FILE_OPERATION_FAILED;
    }

    state.from_decree_excluded = 0;
    state.to_decree_included = ci;

    ddebug("%s: get checkpoint succeed, from_decree_excluded = 0, to_decree_included = %" PRId64 "",
2083
           replica_name(),
Q
qinzuoyan 已提交
2084 2085 2086 2087
           state.to_decree_included);
    return ::dsn::ERR_OK;
}

2088 2089 2090
::dsn::error_code
pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode,
                                              const dsn::replication::learn_state &state)
Q
qinzuoyan 已提交
2091 2092 2093 2094
{
    ::dsn::error_code err;
    int64_t ci = state.to_decree_included;

2095
    if (mode == chkpt_apply_mode::copy) {
Q
qinzuoyan 已提交
2096 2097 2098 2099 2100 2101
        dassert(ci > last_durable_decree(),
                "state.to_decree_included(%" PRId64 ") <= last_durable_decree(%" PRId64 ")",
                ci,
                last_durable_decree());

        auto learn_dir = ::dsn::utils::filesystem::remove_file_name(state.files[0]);
2102
        auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(ci));
Q
qinzuoyan 已提交
2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119
        if (::dsn::utils::filesystem::rename_path(learn_dir, chkpt_dir)) {
            ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
            dassert(ci > last_durable_decree(),
                    "%" PRId64 " VS %" PRId64 "",
                    ci,
                    last_durable_decree());
            _checkpoints.push_back(ci);
            if (!_checkpoints.empty()) {
                dassert(ci > _checkpoints.back(),
                        "%" PRId64 " VS %" PRId64 "",
                        ci,
                        _checkpoints.back());
            }
            set_last_durable_decree(ci);
            err = ::dsn::ERR_OK;
        } else {
            derror("%s: rename directory %s to %s failed",
2120
                   replica_name(),
Q
qinzuoyan 已提交
2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
                   learn_dir.c_str(),
                   chkpt_dir.c_str());
            err = ::dsn::ERR_FILE_OPERATION_FAILED;
        }

        return err;
    }

    if (_is_open) {
        err = stop(true);
        if (err != ::dsn::ERR_OK) {
2132
            derror("%s: close rocksdb %s failed, error = %s", replica_name(), err.to_string());
Q
qinzuoyan 已提交
2133 2134 2135 2136 2137
            return err;
        }
    }

    // clear data dir
2138 2139
    if (!::dsn::utils::filesystem::remove_path(data_dir())) {
        derror("%s: clear data directory %s failed", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
2140 2141 2142 2143
        return ::dsn::ERR_FILE_OPERATION_FAILED;
    }

    // reopen the db with the new checkpoint files
2144
    if (state.files.size() > 0) {
Q
qinzuoyan 已提交
2145
        // create data dir
2146 2147
        if (!::dsn::utils::filesystem::create_directory(data_dir())) {
            derror("%s: create data directory %s failed", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
2148 2149 2150 2151 2152
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }

        // move learned files from learn_dir to data_dir/rdb
        std::string learn_dir = ::dsn::utils::filesystem::remove_file_name(state.files[0]);
2153
        std::string new_dir = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
Q
qinzuoyan 已提交
2154 2155
        if (!::dsn::utils::filesystem::rename_path(learn_dir, new_dir)) {
            derror("%s: rename directory %s to %s failed",
2156
                   replica_name(),
Q
qinzuoyan 已提交
2157 2158 2159 2160 2161 2162 2163
                   learn_dir.c_str(),
                   new_dir.c_str());
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }

        err = start(0, nullptr);
    } else {
2164
        ddebug("%s: apply empty checkpoint, create new rocksdb", replica_name());
Q
qinzuoyan 已提交
2165 2166 2167 2168
        err = start(0, nullptr);
    }

    if (err != ::dsn::ERR_OK) {
2169
        derror("%s: open rocksdb failed, error = %s", replica_name(), err.to_string());
Q
qinzuoyan 已提交
2170 2171 2172 2173 2174 2175 2176
        return err;
    }

    dassert(_is_open, "");
    dassert(ci == last_durable_decree(), "%" PRId64 " VS %" PRId64 "", ci, last_durable_decree());

    ddebug("%s: apply checkpoint succeed, last_durable_decree = %" PRId64,
2177
           replica_name(),
Q
qinzuoyan 已提交
2178 2179 2180 2181
           last_durable_decree());
    return ::dsn::ERR_OK;
}

2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235
bool pegasus_server_impl::is_filter_type_supported(::dsn::apps::filter_type::type filter_type)
{
    return filter_type >= ::dsn::apps::filter_type::FT_NO_FILTER &&
           filter_type <= ::dsn::apps::filter_type::FT_MATCH_POSTFIX;
}

bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type,
                                          const ::dsn::blob &filter_pattern,
                                          const ::dsn::blob &value)
{
    if (filter_type == ::dsn::apps::filter_type::FT_NO_FILTER || filter_pattern.length() == 0)
        return true;
    if (value.length() < filter_pattern.length())
        return false;
    switch (filter_type) {
    case ::dsn::apps::filter_type::FT_MATCH_ANYWHERE: {
        // brute force search
        // TODO: improve it according to
        //   http://old.blog.phusion.nl/2010/12/06/efficient-substring-searching/
        const char *a1 = value.data();
        int l1 = value.length();
        const char *a2 = filter_pattern.data();
        int l2 = filter_pattern.length();
        for (int i = 0; i <= l1 - l2; ++i) {
            int j = 0;
            while (j < l2 && a1[i + j] == a2[j])
                ++j;
            if (j == l2)
                return true;
        }
        return false;
    }
    case ::dsn::apps::filter_type::FT_MATCH_PREFIX:
        return (memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0);
    case ::dsn::apps::filter_type::FT_MATCH_POSTFIX:
        return (memcmp(value.data() + value.length() - filter_pattern.length(),
                       filter_pattern.data(),
                       filter_pattern.length()) == 0);
    default:
        dassert(false, "unsupported filter type: %d", filter_type);
    }
    return false;
}

int pegasus_server_impl::append_key_value_for_scan(
    std::vector<::dsn::apps::key_value> &kvs,
    const rocksdb::Slice &key,
    const rocksdb::Slice &value,
    ::dsn::apps::filter_type::type hash_key_filter_type,
    const ::dsn::blob &hash_key_filter_pattern,
    ::dsn::apps::filter_type::type sort_key_filter_type,
    const ::dsn::blob &sort_key_filter_pattern,
    uint32_t epoch_now,
    bool no_value)
Q
qinzuoyan 已提交
2236 2237 2238 2239
{
    uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
    if (expire_ts > 0 && expire_ts <= epoch_now) {
        if (_verbose_log) {
2240
            derror("%s: rocksdb data expired for scan", replica_name());
Q
qinzuoyan 已提交
2241
        }
2242
        return 2;
Q
qinzuoyan 已提交
2243 2244 2245 2246 2247
    }

    ::dsn::apps::key_value kv;

    // extract raw key
2248 2249 2250 2251 2252 2253 2254 2255
    ::dsn::blob raw_key(key.data(), 0, key.size());
    if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
        sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER) {
        ::dsn::blob hash_key, sort_key;
        pegasus_restore_key(raw_key, hash_key, sort_key);
        if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
            !validate_filter(hash_key_filter_type, hash_key_filter_pattern, hash_key)) {
            if (_verbose_log) {
2256
                derror("%s: hash key filtered for scan", replica_name());
2257 2258 2259 2260 2261 2262
            }
            return 3;
        }
        if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
            !validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
            if (_verbose_log) {
2263
                derror("%s: sort key filtered for scan", replica_name());
2264 2265 2266 2267
            }
            return 3;
        }
    }
2268
    std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
2269 2270
    ::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
    kv.key.assign(std::move(key_buf), 0, raw_key.length());
Q
qinzuoyan 已提交
2271 2272

    // extract value
2273 2274 2275 2276
    if (!no_value) {
        std::unique_ptr<std::string> value_buf(new std::string(value.data(), value.size()));
        pegasus_extract_user_data(_value_schema_version, std::move(value_buf), kv.value);
    }
Q
qinzuoyan 已提交
2277

2278
    kvs.emplace_back(std::move(kv));
2279
    return 1;
Q
qinzuoyan 已提交
2280 2281
}

2282 2283 2284 2285 2286 2287 2288 2289
int pegasus_server_impl::append_key_value_for_multi_get(
    std::vector<::dsn::apps::key_value> &kvs,
    const rocksdb::Slice &key,
    const rocksdb::Slice &value,
    ::dsn::apps::filter_type::type sort_key_filter_type,
    const ::dsn::blob &sort_key_filter_pattern,
    uint32_t epoch_now,
    bool no_value)
Q
qinzuoyan 已提交
2290 2291 2292 2293
{
    uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
    if (expire_ts > 0 && expire_ts <= epoch_now) {
        if (_verbose_log) {
2294
            derror("%s: rocksdb data expired for multi get", replica_name());
Q
qinzuoyan 已提交
2295
        }
2296
        return 2;
Q
qinzuoyan 已提交
2297 2298 2299 2300 2301 2302 2303 2304
    }

    ::dsn::apps::key_value kv;

    // extract sort_key
    ::dsn::blob raw_key(key.data(), 0, key.size());
    ::dsn::blob hash_key, sort_key;
    pegasus_restore_key(raw_key, hash_key, sort_key);
2305 2306 2307
    if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
        !validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
        if (_verbose_log) {
2308
            derror("%s: sort key filtered for multi get", replica_name());
2309 2310 2311
        }
        return 3;
    }
2312
    std::shared_ptr<char> sort_key_buf(::dsn::utils::make_shared_array<char>(sort_key.length()));
Q
qinzuoyan 已提交
2313 2314 2315 2316 2317 2318 2319 2320 2321
    ::memcpy(sort_key_buf.get(), sort_key.data(), sort_key.length());
    kv.key.assign(std::move(sort_key_buf), 0, sort_key.length());

    // extract value
    if (!no_value) {
        std::unique_ptr<std::string> value_buf(new std::string(value.data(), value.size()));
        pegasus_extract_user_data(_value_schema_version, std::move(value_buf), kv.value);
    }

2322
    kvs.emplace_back(std::move(kv));
2323
    return 1;
Q
qinzuoyan 已提交
2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353
}

// statistic the count and size of files of this type. return (-1,-1) if failed.
static std::pair<int64_t, int64_t> get_type_file_size(const std::string &path,
                                                      const std::string &type)
{
    std::vector<std::string> files;
    if (!::dsn::utils::filesystem::get_subfiles(path, files, false)) {
        dwarn("get subfiles of dir %s failed", path.c_str());
        return std::pair<int64_t, int64_t>(-1, -1);
    }
    int64_t res = 0;
    int64_t cnt = 0;
    for (auto &f : files) {
        if (f.length() > type.length() && f.substr(f.length() - type.length()) == type) {
            int64_t tsize = 0;
            if (::dsn::utils::filesystem::file_size(f, tsize)) {
                res += tsize;
                cnt++;
            } else {
                dwarn("get size of file %s failed", f.c_str());
                return std::pair<int64_t, int64_t>(-1, -1);
            }
        }
    }
    return std::pair<int64_t, int64_t>(cnt, res);
}

std::pair<int64_t, int64_t> pegasus_server_impl::statistic_sst_size()
{
2354 2355
    // dir = data_dir()/rdb
    return get_type_file_size(::dsn::utils::filesystem::path_combine(data_dir(), "rdb"), ".sst");
Q
qinzuoyan 已提交
2356 2357 2358 2359 2360 2361
}

void pegasus_server_impl::updating_rocksdb_sstsize()
{
    std::pair<int64_t, int64_t> sst_size = statistic_sst_size();
    if (sst_size.first == -1) {
2362
        dwarn("%s: statistic sst file size failed", replica_name());
Q
qinzuoyan 已提交
2363 2364 2365 2366
    } else {
        int64_t sst_size_mb = sst_size.second / 1048576;
        ddebug("%s: statistic sst file size succeed, sst_count = %" PRId64 ", sst_size = %" PRId64
               "(%" PRId64 "MB)",
2367
               replica_name(),
Q
qinzuoyan 已提交
2368 2369 2370
               sst_size.first,
               sst_size.second,
               sst_size_mb);
2371 2372
        _pfc_sst_count->set(sst_size.first);
        _pfc_sst_size->set(sst_size_mb);
Q
qinzuoyan 已提交
2373 2374
    }
}
C
cailiuyang 已提交
2375 2376 2377 2378 2379 2380

std::pair<std::string, bool> pegasus_server_impl::get_restore_dir_from_env(int argc, char **argv)
{
    std::pair<std::string, bool> res;
    res.first = std::string();
    res.second = false;
2381 2382
    if (argc <= 0 || ((argc - 1) % 2 != 0) || argv == nullptr) {
        derror("%s: parse restore info from env failed, because invalid argc = %d",
2383
               replica_name(),
2384
               argc);
C
cailiuyang 已提交
2385 2386 2387 2388
        return res;
    }
    // env is compounded in replication_app_base::open() function
    std::map<std::string, std::string> env_kvs;
2389 2390 2391 2392 2393
    int idx = 1;
    while (idx < argc) {
        std::string key = std::string(argv[idx++]);
        std::string value = std::string(argv[idx++]);
        env_kvs.insert(std::make_pair(key, value));
C
cailiuyang 已提交
2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
    }

    auto it = env_kvs.find("force_restore");
    if (it != env_kvs.end()) {
        res.second = true;
    }

    std::stringstream os;
    os << "restore.";
    // TODO: using cold_backup_constant to replace
    // Attention: "policy_name" and "backup_id" must equal to cold_backup_constant::POLICY_NAME
    //             and cold_backup_constant::BACKUP_ID
    it = env_kvs.find("policy_name");
    if (it != env_kvs.end()) {
2408 2409
        ddebug(
            "%s: find policy_name from env, policy_name = %s", replica_name(), it->second.c_str());
C
cailiuyang 已提交
2410 2411 2412 2413 2414 2415
        os << it->second << ".";
    } else {
        return res;
    }
    it = env_kvs.find("backup_id");
    if (it != env_kvs.end()) {
2416
        ddebug("%s: find backup_id from env, backup_id = %s", replica_name(), it->second.c_str());
C
cailiuyang 已提交
2417 2418 2419 2420
        os << it->second;
    } else {
        return res;
    }
2421
    std::string parent_dir = ::dsn::utils::filesystem::remove_file_name(data_dir());
C
cailiuyang 已提交
2422 2423 2424
    res.first = ::dsn::utils::filesystem::path_combine(parent_dir, os.str());
    return res;
}
A
acelyc111 已提交
2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437

void pegasus_server_impl::manual_compact()
{
    ddebug("%s: start to CompactRange", replica_name());
    rocksdb::CompactRangeOptions options;
    options.exclusive_manual_compaction = true;
    options.change_level = true;
    options.target_level = -1;
    options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;
    _db->CompactRange(options, nullptr, nullptr);
    ddebug("%s: CompactRange finished", replica_name());
}

Q
qinzuoyan 已提交
2438 2439
}
} // namespace