pegasus_server_impl.cpp 108.0 KB
Newer Older
Q
qinzuoyan 已提交
1 2 3 4 5
// Copyright (c) 2017, Xiaomi, Inc.  All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "pegasus_server_impl.h"
6
#include <pegasus_const.h>
Q
qinzuoyan 已提交
7 8 9
#include <pegasus_key_schema.h>
#include <pegasus_value_schema.h>
#include <pegasus_utils.h>
10
#include <dsn/utility/smart_pointers.h>
Q
qinzuoyan 已提交
11
#include <dsn/utility/utils.h>
12
#include <dsn/utility/filesystem.h>
Q
qinzuoyan 已提交
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/table.h>
#include <boost/lexical_cast.hpp>
#include <algorithm>

namespace pegasus {
namespace server {

// Although we have removed the INCR operator, but we need reserve the code for compatibility
// reason,
// because there may be some mutation log entries which include the code. Even if these entries need
// not to be applied to rocksdb, they may be deserialized.
DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_INCR, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)

static std::string chkpt_get_dir_name(int64_t decree)
{
    char buffer[256];
    sprintf(buffer, "checkpoint.%" PRId64 "", decree);
    return std::string(buffer);
}

static bool chkpt_init_from_dir(const char *name, int64_t &decree)
{
    return 1 == sscanf(name, "checkpoint.%" PRId64 "", &decree) &&
           std::string(name) == chkpt_get_dir_name(decree);
}

40 41
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
    : dsn::apps::rrdb_service(r),
42
      _usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL),
Q
qinzuoyan 已提交
43 44 45
      _db(nullptr),
      _is_open(false),
      _value_schema_version(0),
C
cailiuyang 已提交
46
      _last_durable_decree(0),
Q
qinzuoyan 已提交
47 48 49
      _physical_error(0),
      _is_checkpointing(false)
{
50 51
    _primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
    _gpid = get_gpid();
Q
qinzuoyan 已提交
52 53 54 55
    _verbose_log = dsn_config_get_value_bool("pegasus.server",
                                             "rocksdb_verbose_log",
                                             false,
                                             "print verbose log for debugging, default is false");
56 57 58 59 60 61 62 63 64 65
    _abnormal_get_time_threshold_ns = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_get_time_threshold_ns",
        0,
        "rocksdb_abnormal_get_time_threshold_ns, default is 0, means no check");
    _abnormal_get_size_threshold = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_get_size_threshold",
        0,
        "rocksdb_abnormal_get_size_threshold, default is 0, means no check");
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    _abnormal_multi_get_time_threshold_ns = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_multi_get_time_threshold_ns",
        0,
        "rocksdb_abnormal_multi_get_time_threshold_ns, default is 0, means no check");
    _abnormal_multi_get_size_threshold = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_multi_get_size_threshold",
        0,
        "rocksdb_abnormal_multi_get_size_threshold, default is 0, means no check");
    _abnormal_multi_get_iterate_count_threshold = dsn_config_get_value_uint64(
        "pegasus.server",
        "rocksdb_abnormal_multi_get_iterate_count_threshold",
        0,
        "rocksdb_abnormal_multi_get_iterate_count_threshold, default is 0, means no check");
Q
qinzuoyan 已提交
81 82 83 84 85 86 87

    // init db options

    // rocksdb default: 4MB
    _db_opts.write_buffer_size =
        (size_t)dsn_config_get_value_uint64("pegasus.server",
                                            "rocksdb_write_buffer_size",
88 89
                                            64 * 1024 * 1024,
                                            "rocksdb options.write_buffer_size, default 64MB");
Q
qinzuoyan 已提交
90 91 92

    // rocksdb default: 2
    _db_opts.max_write_buffer_number =
93 94 95 96
        (int)dsn_config_get_value_int64("pegasus.server",
                                        "rocksdb_max_write_buffer_number",
                                        6,
                                        "rocksdb options.max_write_buffer_number, default 6");
97

98 99 100
    // rocksdb default: -1
    // flush threads are shared among all rocksdb instances in one process.
    _db_opts.max_background_flushes =
101
        (int)dsn_config_get_value_int64("pegasus.server",
102
                                        "rocksdb_max_background_flushes",
103
                                        4,
104 105 106 107 108 109 110 111 112
                                        "rocksdb options.max_background_flushes, default 4");

    // rocksdb default: -1
    // compaction threads are shared among all rocksdb instances in one process.
    _db_opts.max_background_compactions =
        (int)dsn_config_get_value_int64("pegasus.server",
                                        "rocksdb_max_background_compactions",
                                        12,
                                        "rocksdb options.max_background_compactions, default 12");
Q
qinzuoyan 已提交
113 114

    // rocksdb default: 7
115
    _db_opts.num_levels = (int)dsn_config_get_value_int64(
Q
qinzuoyan 已提交
116 117 118 119 120 121
        "pegasus.server", "rocksdb_num_levels", 6, "rocksdb options.num_levels, default 6");

    // rocksdb default: 2MB
    _db_opts.target_file_size_base =
        dsn_config_get_value_uint64("pegasus.server",
                                    "rocksdb_target_file_size_base",
122 123
                                    64 * 1024 * 1024,
                                    "rocksdb options.target_file_size_base, default 64MB");
Q
qinzuoyan 已提交
124

125 126 127 128 129 130 131
    // rocksdb default: 1
    _db_opts.target_file_size_multiplier =
        (int)dsn_config_get_value_int64("pegasus.server",
                                        "rocksdb_target_file_size_multiplier",
                                        1,
                                        "rocksdb options.target_file_size_multiplier, default 1");

Q
qinzuoyan 已提交
132 133 134 135
    // rocksdb default: 10MB
    _db_opts.max_bytes_for_level_base =
        dsn_config_get_value_uint64("pegasus.server",
                                    "rocksdb_max_bytes_for_level_base",
136 137
                                    10 * 64 * 1024 * 1024,
                                    "rocksdb options.max_bytes_for_level_base, default 640MB");
Q
qinzuoyan 已提交
138

139 140 141 142 143 144
    // rocksdb default: 10
    _db_opts.max_bytes_for_level_multiplier = dsn_config_get_value_double(
        "pegasus.server",
        "rocksdb_max_bytes_for_level_multiplier",
        10,
        "rocksdb options.rocksdb_max_bytes_for_level_multiplier, default 10");
Q
qinzuoyan 已提交
145

146 147 148
    // we need set max_compaction_bytes definitely because set_usage_scenario() depends on it.
    _db_opts.max_compaction_bytes = _db_opts.target_file_size_base * 25;

Q
qinzuoyan 已提交
149 150
    // rocksdb default: 4
    _db_opts.level0_file_num_compaction_trigger =
151 152 153 154
        (int)dsn_config_get_value_int64("pegasus.server",
                                        "rocksdb_level0_file_num_compaction_trigger",
                                        4,
                                        "rocksdb options.level0_file_num_compaction_trigger, 4");
Q
qinzuoyan 已提交
155 156

    // rocksdb default: 20
157
    _db_opts.level0_slowdown_writes_trigger = (int)dsn_config_get_value_int64(
Q
qinzuoyan 已提交
158 159
        "pegasus.server",
        "rocksdb_level0_slowdown_writes_trigger",
160 161
        30,
        "rocksdb options.level0_slowdown_writes_trigger, default 30");
Q
qinzuoyan 已提交
162 163 164

    // rocksdb default: 24
    _db_opts.level0_stop_writes_trigger =
165 166 167 168
        (int)dsn_config_get_value_int64("pegasus.server",
                                        "rocksdb_level0_stop_writes_trigger",
                                        60,
                                        "rocksdb options.level0_stop_writes_trigger, default 60");
Q
qinzuoyan 已提交
169 170

    // disable table block cache, default: false
171 172 173 174
    if (dsn_config_get_value_bool("pegasus.server",
                                  "rocksdb_disable_table_block_cache",
                                  false,
                                  "rocksdb options.disable_table_block_cache, default false")) {
Q
qinzuoyan 已提交
175 176 177 178 179 180
        rocksdb::BlockBasedTableOptions table_options;
        table_options.no_block_cache = true;
        table_options.block_restart_interval = 4;
        _db_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
    }

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    // rocksdb default: snappy
    std::string compression_str = dsn_config_get_value_string(
        "pegasus.server",
        "rocksdb_compression_type",
        "snappy",
        "rocksdb options.compression, default snappy. Supported: none, snappy.");
    if (compression_str == "none") {
        _db_opts.compression = rocksdb::kNoCompression;
    } else if (compression_str == "snappy") {
        _db_opts.compression = rocksdb::kSnappyCompression;
    } else {
        dassert("unsupported compression type: %s", compression_str.c_str());
    }

    if (_db_opts.compression != rocksdb::kNoCompression) {
        // only compress levels >= 2
        // refer to ColumnFamilyOptions::OptimizeLevelStyleCompaction()
        _db_opts.compression_per_level.resize(_db_opts.num_levels);
        for (int i = 0; i < _db_opts.num_levels; ++i) {
            if (i < 2) {
                _db_opts.compression_per_level[i] = rocksdb::kNoCompression;
            } else {
                _db_opts.compression_per_level[i] = _db_opts.compression;
            }
        }
    }

Q
qinzuoyan 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    // disable write ahead logging as replication handles logging instead now
    _wt_opts.disableWAL = true;

    // get the checkpoint reserve options.
    _checkpoint_reserve_min_count = (uint32_t)dsn_config_get_value_uint64(
        "pegasus.server", "checkpoint_reserve_min_count", 3, "checkpoint_reserve_min_count");
    _checkpoint_reserve_time_seconds =
        (uint32_t)dsn_config_get_value_uint64("pegasus.server",
                                              "checkpoint_reserve_time_seconds",
                                              3600,
                                              "checkpoint_reserve_time_seconds");

    // get the _updating_sstsize_inteval_seconds.
    _updating_rocksdb_sstsize_interval_seconds =
        (uint32_t)dsn_config_get_value_uint64("pegasus.server",
                                              "updating_rocksdb_sstsize_interval_seconds",
                                              600,
                                              "updating_rocksdb_sstsize_interval_seconds");

227 228 229 230
    // TODO: move the qps/latency counters and it's statistics to replication_app_base layer
    char str_gpid[128], buf[256];
    snprintf(str_gpid, 128, "%d.%d", _gpid.get_app_id(), _gpid.get_partition_index());

Q
qinzuoyan 已提交
231
    // register the perf counters
232
    snprintf(buf, 255, "get_qps@%s", str_gpid);
233 234
    _pfc_get_qps.init_app_counter(
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of GET request");
Q
qinzuoyan 已提交
235

236
    snprintf(buf, 255, "multi_get_qps@%s", str_gpid);
237
    _pfc_multi_get_qps.init_app_counter(
Q
qinzuoyan 已提交
238 239
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");

240
    snprintf(buf, 255, "scan_qps@%s", str_gpid);
241 242
    _pfc_scan_qps.init_app_counter(
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
Q
qinzuoyan 已提交
243

244
    snprintf(buf, 255, "put_qps@%s", str_gpid);
245 246
    _pfc_put_qps.init_app_counter(
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of PUT request");
Q
qinzuoyan 已提交
247

248
    snprintf(buf, 255, "multi_put_qps@%s", str_gpid);
249
    _pfc_multi_put_qps.init_app_counter(
Q
qinzuoyan 已提交
250 251
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");

252
    snprintf(buf, 255, "remove_qps@%s", str_gpid);
253
    _pfc_remove_qps.init_app_counter(
Q
qinzuoyan 已提交
254 255
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");

256
    snprintf(buf, 255, "multi_remove_qps@%s", str_gpid);
257
    _pfc_multi_remove_qps.init_app_counter(
Q
qinzuoyan 已提交
258 259
        "app.pegasus", buf, COUNTER_TYPE_RATE, "statistic the qps of MULTI_REMOVE request");

260
    snprintf(buf, 255, "get_latency@%s", str_gpid);
261 262 263 264
    _pfc_get_latency.init_app_counter("app.pegasus",
                                      buf,
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of GET request");
Q
qinzuoyan 已提交
265

266
    snprintf(buf, 255, "multi_get_latency@%s", str_gpid);
267 268 269 270
    _pfc_multi_get_latency.init_app_counter("app.pegasus",
                                            buf,
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_GET request");
Q
qinzuoyan 已提交
271

272
    snprintf(buf, 255, "scan_latency@%s", str_gpid);
273 274 275 276
    _pfc_scan_latency.init_app_counter("app.pegasus",
                                       buf,
                                       COUNTER_TYPE_NUMBER_PERCENTILES,
                                       "statistic the latency of SCAN request");
Q
qinzuoyan 已提交
277

278
    snprintf(buf, 255, "put_latency@%s", str_gpid);
279 280 281 282
    _pfc_put_latency.init_app_counter("app.pegasus",
                                      buf,
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of PUT request");
Q
qinzuoyan 已提交
283

284
    snprintf(buf, 255, "multi_put_latency@%s", str_gpid);
285 286 287 288
    _pfc_multi_put_latency.init_app_counter("app.pegasus",
                                            buf,
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_PUT request");
Q
qinzuoyan 已提交
289

290
    snprintf(buf, 255, "remove_latency@%s", str_gpid);
291 292 293 294
    _pfc_remove_latency.init_app_counter("app.pegasus",
                                         buf,
                                         COUNTER_TYPE_NUMBER_PERCENTILES,
                                         "statistic the latency of REMOVE request");
Q
qinzuoyan 已提交
295

296
    snprintf(buf, 255, "multi_remove_latency@%s", str_gpid);
297 298 299 300
    _pfc_multi_remove_latency.init_app_counter("app.pegasus",
                                               buf,
                                               COUNTER_TYPE_NUMBER_PERCENTILES,
                                               "statistic the latency of MULTI_REMOVE request");
Q
qinzuoyan 已提交
301

302
    snprintf(buf, 255, "recent.expire.count@%s", str_gpid);
303 304 305 306
    _pfc_recent_expire_count.init_app_counter("app.pegasus",
                                              buf,
                                              COUNTER_TYPE_VOLATILE_NUMBER,
                                              "statistic the recent expired value read count");
307

308
    snprintf(buf, 255, "recent.filter.count@%s", str_gpid);
309 310 311 312
    _pfc_recent_filter_count.init_app_counter("app.pegasus",
                                              buf,
                                              COUNTER_TYPE_VOLATILE_NUMBER,
                                              "statistic the recent filtered value read count");
313

314 315 316 317 318 319
    snprintf(buf, 255, "recent.abnormal.count@%s", str_gpid);
    _pfc_recent_abnormal_count.init_app_counter("app.pegasus",
                                                buf,
                                                COUNTER_TYPE_VOLATILE_NUMBER,
                                                "statistic the recent abnormal read count");

320
    snprintf(buf, 255, "disk.storage.sst.count@%s", str_gpid);
321
    _pfc_sst_count.init_app_counter(
Q
qinzuoyan 已提交
322
        "app.pegasus", buf, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
323

324
    snprintf(buf, 255, "disk.storage.sst(MB)@%s", str_gpid);
325
    _pfc_sst_size.init_app_counter(
Q
qinzuoyan 已提交
326
        "app.pegasus", buf, COUNTER_TYPE_NUMBER, "statistic the size of sstable files");
327

Q
qinzuoyan 已提交
328 329 330 331 332 333
    updating_rocksdb_sstsize();
}

void pegasus_server_impl::parse_checkpoints()
{
    std::vector<std::string> dirs;
334
    ::dsn::utils::filesystem::get_subdirectories(data_dir(), dirs, false);
Q
qinzuoyan 已提交
335 336 337 338 339 340

    ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);

    _checkpoints.clear();
    for (auto &d : dirs) {
        int64_t ci;
341
        std::string d1 = d.substr(data_dir().size() + 1);
Q
qinzuoyan 已提交
342 343 344
        if (chkpt_init_from_dir(d1.c_str(), ci)) {
            _checkpoints.push_back(ci);
        } else if (d1.find("checkpoint") != std::string::npos) {
345
            ddebug("%s: invalid checkpoint directory %s, remove it", replica_name(), d.c_str());
Q
qinzuoyan 已提交
346 347
            ::dsn::utils::filesystem::remove_path(d);
            if (!::dsn::utils::filesystem::remove_path(d)) {
348 349
                derror(
                    "%s: remove invalid checkpoint directory %s failed", replica_name(), d.c_str());
Q
qinzuoyan 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
            }
        }
    }

    if (!_checkpoints.empty()) {
        std::sort(_checkpoints.begin(), _checkpoints.end());
        set_last_durable_decree(_checkpoints.back());
    } else {
        set_last_durable_decree(0);
    }
}

void pegasus_server_impl::gc_checkpoints()
{
    std::deque<int64_t> temp_list;
    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        if (_checkpoints.size() <= _checkpoint_reserve_min_count)
            return;
        temp_list = _checkpoints;
    }

    // find the max checkpoint which can be deleted
    int64_t max_del_d = -1;
    uint64_t current_time = dsn_now_ms() / 1000;
    for (int i = 0; i < temp_list.size(); ++i) {
        if (i + _checkpoint_reserve_min_count >= temp_list.size())
            break;
        int64_t d = temp_list[i];
        // we check last write time of "CURRENT" instead of directory, because the directory's
        // last write time may be updated by previous incompleted garbage collection.
381
        auto cpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(d));
Q
qinzuoyan 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
        auto current_file = ::dsn::utils::filesystem::path_combine(cpt_dir, "CURRENT");
        if (!::dsn::utils::filesystem::file_exists(current_file)) {
            max_del_d = d;
            continue;
        }
        time_t tm;
        if (!dsn::utils::filesystem::last_write_time(current_file, tm)) {
            dwarn("get last write time of file %s failed", current_file.c_str());
            break;
        }
        uint64_t last_write_time = (uint64_t)tm;
        if (last_write_time + _checkpoint_reserve_time_seconds >= current_time) {
            // not expired
            break;
        }
        max_del_d = d;
    }
    if (max_del_d == -1) {
        // no checkpoint to delete
        ddebug("%s: no checkpoint to garbage collection, checkpoints_count = %d",
402
               replica_name(),
Q
qinzuoyan 已提交
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
               (int)temp_list.size());
        return;
    }
    std::list<int64_t> to_delete_list;
    int64_t min_d = 0;
    int64_t max_d = 0;
    int checkpoints_count = 0;
    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        int delete_max_index = -1;
        for (int i = 0; i < _checkpoints.size(); ++i) {
            int64_t del_d = _checkpoints[i];
            if (i + _checkpoint_reserve_min_count >= _checkpoints.size() || del_d > max_del_d)
                break;
            to_delete_list.push_back(del_d);
            delete_max_index = i;
        }
420
        if (delete_max_index >= 0) {
Q
qinzuoyan 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
            _checkpoints.erase(_checkpoints.begin(), _checkpoints.begin() + delete_max_index + 1);
        }

        if (!_checkpoints.empty()) {
            min_d = _checkpoints.front();
            max_d = _checkpoints.back();
            checkpoints_count = _checkpoints.size();
        } else {
            min_d = 0;
            max_d = 0;
            checkpoints_count = 0;
        }
    }

    // do delete
    std::list<int64_t> put_back_list;
    for (auto &del_d : to_delete_list) {
438 439
        auto cpt_dir =
            ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(del_d));
Q
qinzuoyan 已提交
440 441 442
        if (::dsn::utils::filesystem::directory_exists(cpt_dir)) {
            if (::dsn::utils::filesystem::remove_path(cpt_dir)) {
                ddebug("%s: checkpoint directory %s removed by garbage collection",
443
                       replica_name(),
Q
qinzuoyan 已提交
444 445 446
                       cpt_dir.c_str());
            } else {
                derror("%s: checkpoint directory %s remove failed by garbage collection",
447
                       replica_name(),
Q
qinzuoyan 已提交
448 449 450 451 452
                       cpt_dir.c_str());
                put_back_list.push_back(del_d);
            }
        } else {
            ddebug("%s: checkpoint directory %s does not exist, ignored by garbage collection",
453
                   replica_name(),
Q
qinzuoyan 已提交
454 455 456 457
                   cpt_dir.c_str());
        }
    }

458 459 460
    // put back checkpoints which is not deleted, to make it delete again in the next gc time.
    // ATTENTION: the put back checkpoint may be incomplete, which will cause failure on load. But
    // it would not cause data lost, because incomplete checkpoint can not be loaded successfully.
Q
qinzuoyan 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
    if (!put_back_list.empty()) {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        if (_checkpoints.empty() || put_back_list.back() < _checkpoints.front()) {
            // just insert to front will hold the order
            _checkpoints.insert(_checkpoints.begin(), put_back_list.begin(), put_back_list.end());
        } else {
            // need to re-sort
            _checkpoints.insert(_checkpoints.begin(), put_back_list.begin(), put_back_list.end());
            std::sort(_checkpoints.begin(), _checkpoints.end());
        }

        if (!_checkpoints.empty()) {
            min_d = _checkpoints.front();
            max_d = _checkpoints.back();
            checkpoints_count = _checkpoints.size();
        } else {
            min_d = 0;
            max_d = 0;
            checkpoints_count = 0;
        }
    }

    ddebug("%s: after checkpoint garbage collection, checkpoints_count = %d, "
           "min_checkpoint = %" PRId64 ", max_checkpoint = %" PRId64,
485
           replica_name(),
Q
qinzuoyan 已提交
486 487 488 489 490
           checkpoints_count,
           min_d,
           max_d);
}

491 492 493 494
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
                                                   int64_t timestamp,
                                                   dsn_message_t *requests,
                                                   int count)
Q
qinzuoyan 已提交
495 496 497 498 499
{
    dassert(_is_open, "");
    dassert(requests != nullptr, "");
    uint64_t start_time = dsn_now_ns();

500
    _physical_error = 0;
Q
qinzuoyan 已提交
501 502 503
    if (count == 1 &&
        ((::dsn::message_ex *)requests[0])->local_rpc_code ==
            ::dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
504
        _pfc_multi_put_qps->increment();
Q
qinzuoyan 已提交
505 506 507
        dsn_message_t request = requests[0];

        ::dsn::apps::update_response resp;
508 509
        resp.app_id = _gpid.get_app_id();
        resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
510 511 512 513 514 515 516 517
        resp.decree = decree;
        resp.server = _primary_address;

        ::dsn::apps::multi_put_request update;
        ::dsn::unmarshall(request, update);

        if (update.kvs.empty()) {
            // invalid argument
518 519
            derror("%s: invalid argument for multi_put from %s: "
                   "decree = %" PRId64 ", error = empty kvs",
520
                   replica_name(),
521
                   dsn_msg_from_address(request).to_string(),
Q
qinzuoyan 已提交
522 523 524 525 526 527 528 529 530
                   decree);

            ::dsn::rpc_replier<::dsn::apps::update_response> replier(
                dsn_msg_create_response(request));
            if (!replier.is_empty()) {
                // an invalid operation shoundn't be added to latency calculation
                resp.error = rocksdb::Status::kInvalidArgument;
                replier(resp);
            }
531
            return 0;
Q
qinzuoyan 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
        }

        for (auto &kv : update.kvs) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, update.hash_key, kv.key);
            rocksdb::Slice slice_key(raw_key.data(), raw_key.length());
            rocksdb::SliceParts skey(&slice_key, 1);

            pegasus_generate_value(_value_schema_version,
                                   update.expire_ts_seconds,
                                   kv.value,
                                   _write_buf,
                                   _write_slices);
            rocksdb::SliceParts svalue(&_write_slices[0], _write_slices.size());

            _batch.Put(skey, svalue);
        }

        _wt_opts.given_decree = decree;
        rocksdb::Status status = _db->Write(_wt_opts, &_batch);
        if (!status.ok()) {
553 554
            derror("%s: rocksdb write failed for multi_put from %s: "
                   "decree = %" PRId64 ", error = %s",
555
                   replica_name(),
556
                   dsn_msg_from_address(request).to_string(),
Q
qinzuoyan 已提交
557 558 559 560 561 562 563
                   decree,
                   status.ToString().c_str());
            _physical_error = status.code();
        }

        ::dsn::rpc_replier<::dsn::apps::update_response> replier(dsn_msg_create_response(request));
        if (!replier.is_empty()) {
564
            _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
565 566 567 568 569
            resp.error = status.code();
            replier(resp);
        }

        _batch.Clear();
570
        return _physical_error;
Q
qinzuoyan 已提交
571 572 573
    } else if (count == 1 &&
               ((::dsn::message_ex *)requests[0])->local_rpc_code ==
                   ::dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
574
        _pfc_multi_remove_qps->increment();
Q
qinzuoyan 已提交
575 576 577
        dsn_message_t request = requests[0];

        ::dsn::apps::multi_remove_response resp;
578 579
        resp.app_id = _gpid.get_app_id();
        resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
580 581 582 583 584 585 586 587
        resp.decree = decree;
        resp.server = _primary_address;

        ::dsn::apps::multi_remove_request update;
        ::dsn::unmarshall(request, update);

        if (update.sort_keys.empty()) {
            // invalid argument
588 589
            derror("%s: invalid argument for multi_remove from %s: "
                   "decree = %" PRId64 ", error = empty sort keys",
590
                   replica_name(),
591
                   dsn_msg_from_address(request).to_string(),
Q
qinzuoyan 已提交
592 593 594 595 596 597 598 599 600 601
                   decree);

            ::dsn::rpc_replier<::dsn::apps::multi_remove_response> replier(
                dsn_msg_create_response(request));
            if (!replier.is_empty()) {
                // an invalid operation shoundn't be added to latency calculation
                resp.error = rocksdb::Status::kInvalidArgument;
                resp.count = 0;
                replier(resp);
            }
602
            return 0;
Q
qinzuoyan 已提交
603 604 605 606 607 608 609 610 611 612 613
        }

        for (auto &sort_key : update.sort_keys) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, update.hash_key, sort_key);
            _batch.Delete(rocksdb::Slice(raw_key.data(), raw_key.length()));
        }

        _wt_opts.given_decree = decree;
        rocksdb::Status status = _db->Write(_wt_opts, &_batch);
        if (!status.ok()) {
614 615
            derror("%s: rocksdb write failed for multi_remove from %s: "
                   "decree = %" PRId64 ", error = %s",
616
                   replica_name(),
617
                   dsn_msg_from_address(request).to_string(),
Q
qinzuoyan 已提交
618 619 620 621 622 623 624 625 626 627 628
                   decree,
                   status.ToString().c_str());
            _physical_error = status.code();
            resp.count = 0;
        } else {
            resp.count = update.sort_keys.size();
        }

        ::dsn::rpc_replier<::dsn::apps::multi_remove_response> replier(
            dsn_msg_create_response(request));
        if (!replier.is_empty()) {
629
            _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
630 631 632 633 634
            resp.error = status.code();
            replier(resp);
        }

        _batch.Clear();
635
        return _physical_error;
Q
qinzuoyan 已提交
636 637 638 639 640 641 642
    } else {
        for (int i = 0; i < count; ++i) {
            dsn_message_t request = requests[i];
            dassert(request != nullptr, "");
            ::dsn::message_ex *msg = (::dsn::message_ex *)request;
            ::dsn::blob key;
            if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_PUT) {
643
                _pfc_put_qps->increment();
Q
qinzuoyan 已提交
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
                ::dsn::apps::update_request update;
                ::dsn::unmarshall(request, update);
                key = update.key;

                rocksdb::Slice slice_key(key.data(), key.length());
                rocksdb::SliceParts skey(&slice_key, 1);

                pegasus_generate_value(_value_schema_version,
                                       update.expire_ts_seconds,
                                       update.value,
                                       _write_buf,
                                       _write_slices);
                rocksdb::SliceParts svalue(&_write_slices[0], _write_slices.size());

                _batch.Put(skey, svalue);
                _batch_repliers.emplace_back(dsn_msg_create_response(request));
660
                _batch_perfcounters.push_back(_pfc_put_latency.get());
Q
qinzuoyan 已提交
661
            } else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_REMOVE) {
662
                _pfc_remove_qps->increment();
Q
qinzuoyan 已提交
663 664 665 666 667
                ::dsn::unmarshall(request, key);

                rocksdb::Slice skey(key.data(), key.length());
                _batch.Delete(skey);
                _batch_repliers.emplace_back(dsn_msg_create_response(request));
668
                _batch_perfcounters.push_back(_pfc_remove_latency.get());
Q
qinzuoyan 已提交
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
            } else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
                       msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
                dassert(false,
                        "rpc code not allow batch: %s",
                        ::dsn::task_code(msg->local_rpc_code).to_string());
            } else {
                dassert(false,
                        "rpc code not handled: %s",
                        ::dsn::task_code(msg->local_rpc_code).to_string());
            }

            if (msg->header->client.partition_hash != 0) {
                uint64_t partition_hash = pegasus_key_hash(key);
                dassert(msg->header->client.partition_hash == partition_hash,
                        "inconsistent partition hash");
684
                int thread_hash = _gpid.thread_hash();
685 686
                dassert(msg->header->client.thread_hash == thread_hash,
                        "inconsistent thread hash");
Q
qinzuoyan 已提交
687 688 689 690 691
            }

            if (_verbose_log) {
                ::dsn::blob hash_key, sort_key;
                pegasus_restore_key(key, hash_key, sort_key);
692 693
                ddebug("%s: rocksdb write from %s: "
                       "decree = %" PRId64 ", code = %s, hash_key = \"%s\", sort_key = \"%s\"",
694
                       replica_name(),
695
                       dsn_msg_from_address(request).to_string(),
Q
qinzuoyan 已提交
696
                       decree,
697
                       msg->local_rpc_code.to_string(),
698 699
                       ::pegasus::utils::c_escape_string(hash_key).c_str(),
                       ::pegasus::utils::c_escape_string(sort_key).c_str());
Q
qinzuoyan 已提交
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
            }
        }
    }

    if (_batch.Count() == 0) {
        // write fake data
        rocksdb::Slice empty_key;
        rocksdb::SliceParts skey(&empty_key, 1);

        ::dsn::blob empty_value;
        pegasus_generate_value(_value_schema_version, 0, empty_value, _write_buf, _write_slices);
        rocksdb::SliceParts svalue(&_write_slices[0], _write_slices.size());

        _batch.Put(skey, svalue);
    }
    _wt_opts.given_decree = decree;
    auto status = _db->Write(_wt_opts, &_batch);
    if (!status.ok()) {
718 719 720 721
        dsn::rpc_address from_address;
        if (count > 0)
            from_address = dsn_msg_from_address(requests[0]);
        derror("%s: rocksdb write failed from %s: decree = %" PRId64 ", error = %s",
722
               replica_name(),
723
               from_address.to_string(),
Q
qinzuoyan 已提交
724 725 726 727 728 729 730
               decree,
               status.ToString().c_str());
        _physical_error = status.code();
    }

    ::dsn::apps::update_response resp;
    resp.error = status.code();
731 732
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
733 734 735 736 737
    resp.decree = decree;
    resp.server = _primary_address;

    dassert(_batch_repliers.size() == _batch_perfcounters.size(),
            "%s: repliers's size(%u) vs perfcounters's size(%u) not match",
738
            replica_name(),
Q
qinzuoyan 已提交
739 740 741 742 743
            _batch_repliers.size(),
            _batch_perfcounters.size());
    uint64_t latency = dsn_now_ns() - start_time;
    for (unsigned int i = 0; i != _batch_repliers.size(); ++i) {
        if (!_batch_repliers[i].is_empty()) {
744
            _batch_perfcounters[i]->set(latency);
Q
qinzuoyan 已提交
745 746 747 748 749 750 751
            _batch_repliers[i](resp);
        }
    }

    _batch.Clear();
    _batch_repliers.clear();
    _batch_perfcounters.clear();
752 753

    return _physical_error;
Q
qinzuoyan 已提交
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
}

void pegasus_server_impl::on_put(const ::dsn::apps::update_request &update,
                                 ::dsn::rpc_replier<::dsn::apps::update_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_multi_put(const ::dsn::apps::multi_put_request &args,
                                       ::dsn::rpc_replier<::dsn::apps::update_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_remove(const ::dsn::blob &key,
                                    ::dsn::rpc_replier<::dsn::apps::update_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_multi_remove(
    const ::dsn::apps::multi_remove_request &args,
    ::dsn::rpc_replier<::dsn::apps::multi_remove_response> &reply)
{
    dassert(false, "not implemented");
}

void pegasus_server_impl::on_get(const ::dsn::blob &key,
                                 ::dsn::rpc_replier<::dsn::apps::read_response> &reply)
{
    dassert(_is_open, "");
785
    _pfc_get_qps->increment();
Q
qinzuoyan 已提交
786 787 788
    uint64_t start_time = dsn_now_ns();

    ::dsn::apps::read_response resp;
789 790
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
791 792 793 794 795 796 797 798 799
    resp.server = _primary_address;

    rocksdb::Slice skey(key.data(), key.length());
    std::unique_ptr<std::string> value(new std::string());
    rocksdb::Status status = _db->Get(_rd_opts, skey, value.get());

    if (status.ok()) {
        uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, *value);
        if (expire_ts > 0 && expire_ts <= ::pegasus::utils::epoch_now()) {
800
            _pfc_recent_expire_count->increment();
Q
qinzuoyan 已提交
801
            if (_verbose_log) {
802 803 804
                derror("%s: rocksdb data expired for get from %s",
                       replica_name(),
                       reply.to_address().to_string());
Q
qinzuoyan 已提交
805 806 807 808 809 810 811 812 813
            }
            status = rocksdb::Status::NotFound();
        }
    }

    if (!status.ok()) {
        if (_verbose_log) {
            ::dsn::blob hash_key, sort_key;
            pegasus_restore_key(key, hash_key, sort_key);
814
            derror("%s: rocksdb get failed for get from %s: "
815
                   "hash_key = \"%s\", sort_key = \"%s\", error = %s",
816
                   replica_name(),
817
                   reply.to_address().to_string(),
818 819
                   ::pegasus::utils::c_escape_string(hash_key).c_str(),
                   ::pegasus::utils::c_escape_string(sort_key).c_str(),
Q
qinzuoyan 已提交
820 821
                   status.ToString().c_str());
        } else if (!status.IsNotFound()) {
822
            derror("%s: rocksdb get failed for get from %s: error = %s",
823
                   replica_name(),
824
                   reply.to_address().to_string(),
Q
qinzuoyan 已提交
825 826 827 828
                   status.ToString().c_str());
        }
    }

829 830 831 832 833 834
    if (_abnormal_get_time_threshold_ns || _abnormal_get_size_threshold) {
        uint64_t time_used = dsn_now_ns() - start_time;
        if ((_abnormal_get_time_threshold_ns && time_used >= _abnormal_get_time_threshold_ns) ||
            (_abnormal_get_size_threshold && value->size() >= _abnormal_get_size_threshold)) {
            ::dsn::blob hash_key, sort_key;
            pegasus_restore_key(key, hash_key, sort_key);
835
            dwarn("%s: rocksdb abnormal get from %s: "
836 837 838
                  "hash_key = \"%s\", sort_key = \"%s\", return = %s, "
                  "value_size = %d, time_used = %" PRIu64 " ns",
                  replica_name(),
839
                  reply.to_address().to_string(),
840 841 842 843 844
                  ::pegasus::utils::c_escape_string(hash_key).c_str(),
                  ::pegasus::utils::c_escape_string(sort_key).c_str(),
                  status.ToString().c_str(),
                  (int)value->size(),
                  time_used);
845
            _pfc_recent_abnormal_count->increment();
846
        }
847 848
    }

Q
qinzuoyan 已提交
849 850 851 852 853
    resp.error = status.code();
    if (status.ok()) {
        pegasus_extract_user_data(_value_schema_version, std::move(value), resp.value);
    }

854
    _pfc_get_latency->set(dsn_now_ns() - start_time);
855

Q
qinzuoyan 已提交
856 857 858 859 860 861 862
    reply(resp);
}

void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &request,
                                       ::dsn::rpc_replier<::dsn::apps::multi_get_response> &reply)
{
    dassert(_is_open, "");
863
    _pfc_multi_get_qps->increment();
Q
qinzuoyan 已提交
864 865 866
    uint64_t start_time = dsn_now_ns();

    ::dsn::apps::multi_get_response resp;
867 868
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
869 870
    resp.server = _primary_address;

871
    if (!is_filter_type_supported(request.sort_key_filter_type)) {
872 873
        derror("%s: invalid argument for multi_get from %s: "
               "sort key filter type %d not supported",
874
               replica_name(),
875
               reply.to_address().to_string(),
876 877
               request.sort_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
878
        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
879 880 881 882
        reply(resp);
        return;
    }

Q
qinzuoyan 已提交
883 884 885
    int32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX;
    int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
886 887 888 889 890
    int32_t count = 0;
    int64_t size = 0;
    int32_t iterate_count = 0;
    int32_t expire_count = 0;
    int32_t filter_count = 0;
Q
qinzuoyan 已提交
891 892

    if (request.sort_keys.empty()) {
893 894 895
        ::dsn::blob range_start_key, range_stop_key;
        pegasus_generate_key(range_start_key, request.hash_key, request.start_sortkey);
        bool start_inclusive = request.start_inclusive;
896 897
        bool stop_inclusive;
        if (request.stop_sortkey.length() == 0) {
898
            pegasus_generate_next_blob(range_stop_key, request.hash_key);
899 900
            stop_inclusive = false;
        } else {
901
            pegasus_generate_key(range_stop_key, request.hash_key, request.stop_sortkey);
902 903 904
            stop_inclusive = request.stop_inclusive;
        }

905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934
        rocksdb::Slice start(range_start_key.data(), range_start_key.length());
        rocksdb::Slice stop(range_stop_key.data(), range_stop_key.length());

        // limit key range by prefix filter
        ::dsn::blob prefix_start_key, prefix_stop_key;
        if (request.sort_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
            request.sort_key_filter_pattern.length() > 0) {
            pegasus_generate_key(
                prefix_start_key, request.hash_key, request.sort_key_filter_pattern);
            pegasus_generate_next_blob(
                prefix_stop_key, request.hash_key, request.sort_key_filter_pattern);

            rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
            if (prefix_start.compare(start) > 0) {
                start = prefix_start;
                start_inclusive = true;
            }

            rocksdb::Slice prefix_stop(prefix_stop_key.data(), prefix_stop_key.length());
            if (prefix_stop.compare(stop) <= 0) {
                stop = prefix_stop;
                stop_inclusive = false;
            }
        }

        // check if range is empty
        int c = start.compare(stop);
        if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
            // empty sort key range
            if (_verbose_log) {
935
                dwarn("%s: empty sort key range for multi_get from %s: hash_key = \"%s\", "
936 937 938
                      "start_sort_key = \"%s\" (%s), stop_sort_key = \"%s\" (%s), "
                      "sort_key_filter_type = %s, sort_key_filter_pattern = \"%s\", "
                      "final_start = \"%s\" (%s), final_stop = \"%s\" (%s)",
939
                      replica_name(),
940
                      reply.to_address().to_string(),
941 942 943 944 945 946 947 948 949 950 951 952 953 954
                      ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
                      ::pegasus::utils::c_escape_string(request.start_sortkey).c_str(),
                      request.start_inclusive ? "inclusive" : "exclusive",
                      ::pegasus::utils::c_escape_string(request.stop_sortkey).c_str(),
                      request.stop_inclusive ? "inclusive" : "exclusive",
                      ::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)
                          ->second,
                      ::pegasus::utils::c_escape_string(request.sort_key_filter_pattern).c_str(),
                      ::pegasus::utils::c_escape_string(start).c_str(),
                      start_inclusive ? "inclusive" : "exclusive",
                      ::pegasus::utils::c_escape_string(stop).c_str(),
                      stop_inclusive ? "inclusive" : "exclusive");
            }
            resp.error = rocksdb::Status::kOk;
955
            _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
956 957 958 959
            reply(resp);
            return;
        }

960 961
        std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
        bool complete = false;
962 963 964 965
        if (!request.reverse) {
            it->Seek(start);
            bool first_exclusive = !start_inclusive;
            while (count < max_kv_count && size < max_kv_size && it->Valid()) {
966 967
                iterate_count++;

968 969 970 971 972 973 974
                // check stop sort key
                int c = it->key().compare(stop);
                if (c > 0 || (c == 0 && !stop_inclusive)) {
                    // out of range
                    complete = true;
                    break;
                }
975

976 977 978 979 980 981 982 983
                // check start sort key
                if (first_exclusive) {
                    first_exclusive = false;
                    if (it->key().compare(start) == 0) {
                        // discard start_sortkey
                        it->Next();
                        continue;
                    }
984 985
                }

986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
                // extract value
                int r = append_key_value_for_multi_get(resp.kvs,
                                                       it->key(),
                                                       it->value(),
                                                       request.sort_key_filter_type,
                                                       request.sort_key_filter_pattern,
                                                       epoch_now,
                                                       request.no_value);
                if (r == 1) {
                    count++;
                    auto &kv = resp.kvs.back();
                    size += kv.key.length() + kv.value.length();
                } else if (r == 2) {
                    expire_count++;
                } else { // r == 3
                    filter_count++;
                }

                if (c == 0) {
                    // if arrived to the last position
                    complete = true;
                    break;
                }

                it->Next();
Q
qinzuoyan 已提交
1011
            }
1012 1013 1014 1015 1016
        } else { // reverse
            it->SeekForPrev(stop);
            bool first_exclusive = !stop_inclusive;
            std::vector<::dsn::apps::key_value> reverse_kvs;
            while (count < max_kv_count && size < max_kv_size && it->Valid()) {
1017 1018
                iterate_count++;

1019 1020 1021 1022 1023 1024 1025
                // check start sort key
                int c = it->key().compare(start);
                if (c < 0 || (c == 0 && !start_inclusive)) {
                    // out of range
                    complete = true;
                    break;
                }
1026

1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
                // check stop sort key
                if (first_exclusive) {
                    first_exclusive = false;
                    if (it->key().compare(stop) == 0) {
                        // discard stop_sortkey
                        it->Prev();
                        continue;
                    }
                }

                // extract value
                int r = append_key_value_for_multi_get(reverse_kvs,
                                                       it->key(),
                                                       it->value(),
                                                       request.sort_key_filter_type,
                                                       request.sort_key_filter_pattern,
                                                       epoch_now,
                                                       request.no_value);
                if (r == 1) {
                    count++;
                    auto &kv = reverse_kvs.back();
                    size += kv.key.length() + kv.value.length();
                } else if (r == 2) {
                    expire_count++;
                } else { // r == 3
                    filter_count++;
                }

                if (c == 0) {
                    // if arrived to the last position
                    complete = true;
                    break;
                }

                it->Prev();
1062 1063
            }

1064 1065 1066 1067 1068 1069 1070
            if (it->status().ok() && !reverse_kvs.empty()) {
                // revert order to make resp.kvs ordered in sort_key
                resp.kvs.reserve(reverse_kvs.size());
                for (int i = reverse_kvs.size() - 1; i >= 0; i--) {
                    resp.kvs.emplace_back(std::move(reverse_kvs[i]));
                }
            }
Q
qinzuoyan 已提交
1071 1072 1073 1074 1075 1076
        }

        resp.error = it->status().code();
        if (!it->status().ok()) {
            // error occur
            if (_verbose_log) {
1077 1078
                derror("%s: rocksdb scan failed for multi_get from %s: "
                       "hash_key = \"%s\", reverse = %s, error = %s",
1079
                       replica_name(),
1080
                       reply.to_address().to_string(),
1081
                       ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
1082
                       request.reverse ? "true" : "false",
Q
qinzuoyan 已提交
1083 1084
                       it->status().ToString().c_str());
            } else {
1085 1086
                derror("%s: rocksdb scan failed for multi_get from %s: "
                       "reverse = %s, error = %s",
1087
                       replica_name(),
1088
                       reply.to_address().to_string(),
1089
                       request.reverse ? "true" : "false",
Q
qinzuoyan 已提交
1090 1091 1092
                       it->status().ToString().c_str());
            }
            resp.kvs.clear();
1093
        } else if (it->Valid() && !complete) {
Q
qinzuoyan 已提交
1094 1095 1096 1097
            // scan not completed
            resp.error = rocksdb::Status::kIncomplete;
        }
    } else {
1098 1099
        bool error_occurred = false;
        rocksdb::Status final_status;
Q
qinzuoyan 已提交
1100
        bool exceed_limit = false;
1101 1102 1103 1104 1105
        std::vector<::dsn::blob> keys_holder;
        std::vector<rocksdb::Slice> keys;
        std::vector<std::string> values;
        keys_holder.reserve(request.sort_keys.size());
        keys.reserve(request.sort_keys.size());
Q
qinzuoyan 已提交
1106 1107 1108
        for (auto &sort_key : request.sort_keys) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, request.hash_key, sort_key);
1109 1110 1111
            keys.emplace_back(raw_key.data(), raw_key.length());
            keys_holder.emplace_back(std::move(raw_key));
        }
Q
qinzuoyan 已提交
1112

1113 1114
        std::vector<rocksdb::Status> statuses = _db->MultiGet(_rd_opts, keys, &values);
        for (int i = 0; i < keys.size(); i++) {
1115 1116
            rocksdb::Status &status = statuses[i];
            std::string &value = values[i];
Q
qinzuoyan 已提交
1117 1118 1119
            // print log
            if (!status.ok()) {
                if (_verbose_log) {
1120
                    derror("%s: rocksdb get failed for multi_get from %s: "
1121
                           "hash_key = \"%s\", sort_key = \"%s\", error = %s",
1122
                           replica_name(),
1123
                           reply.to_address().to_string(),
1124
                           ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
1125
                           ::pegasus::utils::c_escape_string(request.sort_keys[i]).c_str(),
Q
qinzuoyan 已提交
1126 1127
                           status.ToString().c_str());
                } else if (!status.IsNotFound()) {
1128
                    derror("%s: rocksdb get failed for multi_get from %s: error = %s",
1129
                           replica_name(),
1130
                           reply.to_address().to_string(),
Q
qinzuoyan 已提交
1131 1132 1133
                           status.ToString().c_str());
                }
            }
1134 1135 1136 1137 1138 1139
            // check ttl
            if (status.ok()) {
                uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
                if (expire_ts > 0 && expire_ts <= epoch_now) {
                    expire_count++;
                    if (_verbose_log) {
1140 1141 1142
                        derror("%s: rocksdb data expired for multi_get from %s",
                               replica_name(),
                               reply.to_address().to_string());
1143 1144 1145 1146
                    }
                    status = rocksdb::Status::NotFound();
                }
            }
Q
qinzuoyan 已提交
1147 1148
            // extract value
            if (status.ok()) {
1149 1150 1151 1152 1153
                // check if exceed limit
                if (count >= max_kv_count || size >= max_kv_size) {
                    exceed_limit = true;
                    break;
                }
Q
qinzuoyan 已提交
1154
                ::dsn::apps::key_value kv;
1155
                kv.key = request.sort_keys[i];
Q
qinzuoyan 已提交
1156
                if (!request.no_value) {
1157 1158 1159
                    pegasus_extract_user_data(_value_schema_version,
                                              ::dsn::make_unique<std::string>(std::move(value)),
                                              kv.value);
Q
qinzuoyan 已提交
1160 1161 1162
                }
                count++;
                size += kv.key.length() + kv.value.length();
1163
                resp.kvs.emplace_back(std::move(kv));
Q
qinzuoyan 已提交
1164 1165 1166 1167
            }
            // if error occurred
            if (!status.ok() && !status.IsNotFound()) {
                error_occurred = true;
1168
                final_status = status;
Q
qinzuoyan 已提交
1169 1170 1171 1172 1173
                break;
            }
        }

        if (error_occurred) {
1174
            resp.error = final_status.code();
Q
qinzuoyan 已提交
1175 1176 1177 1178 1179 1180 1181 1182
            resp.kvs.clear();
        } else if (exceed_limit) {
            resp.error = rocksdb::Status::kIncomplete;
        } else {
            resp.error = rocksdb::Status::kOk;
        }
    }

1183 1184 1185 1186 1187 1188 1189 1190 1191
    if (_abnormal_multi_get_time_threshold_ns || _abnormal_multi_get_size_threshold ||
        _abnormal_multi_get_iterate_count_threshold) {
        uint64_t time_used = dsn_now_ns() - start_time;
        if ((_abnormal_multi_get_time_threshold_ns &&
             time_used >= _abnormal_multi_get_time_threshold_ns) ||
            (_abnormal_multi_get_size_threshold &&
             (uint64_t)size >= _abnormal_multi_get_size_threshold) ||
            (_abnormal_multi_get_iterate_count_threshold &&
             (uint64_t)iterate_count >= _abnormal_multi_get_iterate_count_threshold)) {
1192 1193 1194
            dwarn("%s: rocksdb abnormal multi_get from %s: hash_key = \"%s\", "
                  "start_sort_key = \"%s\" (%s), stop_sort_key = \"%s\" (%s), "
                  "sort_key_filter_type = %s, sort_key_filter_pattern = \"%s\", "
1195 1196 1197
                  "result_count = %d, result_size = %" PRId64 ", iterate_count = %d, "
                  "expire_count = %d, filter_count = %d, time_used = %" PRIu64 " ns",
                  replica_name(),
1198
                  reply.to_address().to_string(),
1199
                  ::pegasus::utils::c_escape_string(request.hash_key).c_str(),
1200 1201 1202 1203 1204 1205 1206
                  ::pegasus::utils::c_escape_string(request.start_sortkey).c_str(),
                  request.start_inclusive ? "inclusive" : "exclusive",
                  ::pegasus::utils::c_escape_string(request.stop_sortkey).c_str(),
                  request.stop_inclusive ? "inclusive" : "exclusive",
                  ::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)
                      ->second,
                  ::pegasus::utils::c_escape_string(request.sort_key_filter_pattern).c_str(),
1207 1208 1209 1210 1211 1212
                  count,
                  size,
                  iterate_count,
                  expire_count,
                  filter_count,
                  time_used);
1213
            _pfc_recent_abnormal_count->increment();
1214 1215 1216
        }
    }

1217
    if (expire_count > 0) {
1218
        _pfc_recent_expire_count->add(expire_count);
1219
    }
1220
    if (filter_count > 0) {
1221
        _pfc_recent_filter_count->add(filter_count);
1222
    }
1223
    _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
1224

Q
qinzuoyan 已提交
1225 1226 1227 1228 1229 1230 1231 1232 1233
    reply(resp);
}

void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key,
                                           ::dsn::rpc_replier<::dsn::apps::count_response> &reply)
{
    dassert(_is_open, "");

    ::dsn::apps::count_response resp;
1234 1235
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
    resp.server = _primary_address;

    // scan
    ::dsn::blob start_key, stop_key;
    pegasus_generate_key(start_key, hash_key, ::dsn::blob());
    pegasus_generate_next_blob(stop_key, hash_key);
    rocksdb::Slice start(start_key.data(), start_key.length());
    rocksdb::Slice stop(stop_key.data(), stop_key.length());
    rocksdb::ReadOptions options = _rd_opts;
    options.iterate_upper_bound = &stop;
    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(options));
    it->Seek(start);
    resp.count = 0;
1249 1250
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
    uint64_t expire_count = 0;
Q
qinzuoyan 已提交
1251
    while (it->Valid()) {
1252 1253 1254 1255
        uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, it->value());
        if (expire_ts > 0 && expire_ts <= epoch_now) {
            expire_count++;
            if (_verbose_log) {
1256 1257 1258
                derror("%s: rocksdb data expired for sortkey_count from %s",
                       replica_name(),
                       reply.to_address().to_string());
1259 1260 1261 1262
            }
        } else {
            resp.count++;
        }
Q
qinzuoyan 已提交
1263 1264
        it->Next();
    }
1265
    if (expire_count > 0) {
1266
        _pfc_recent_expire_count->add(expire_count);
1267
    }
Q
qinzuoyan 已提交
1268 1269 1270 1271 1272

    resp.error = it->status().code();
    if (!it->status().ok()) {
        // error occur
        if (_verbose_log) {
1273 1274
            derror("%s: rocksdb scan failed for sortkey_count from %s: "
                   "hash_key = \"%s\", error = %s",
1275
                   replica_name(),
1276
                   reply.to_address().to_string(),
1277
                   ::pegasus::utils::c_escape_string(hash_key).c_str(),
Q
qinzuoyan 已提交
1278 1279
                   it->status().ToString().c_str());
        } else {
1280
            derror("%s: rocksdb scan failed for sortkey_count from %s: error = %s",
1281
                   replica_name(),
1282
                   reply.to_address().to_string(),
Q
qinzuoyan 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296
                   it->status().ToString().c_str());
        }
        resp.count = 0;
    }

    reply(resp);
}

void pegasus_server_impl::on_ttl(const ::dsn::blob &key,
                                 ::dsn::rpc_replier<::dsn::apps::ttl_response> &reply)
{
    dassert(_is_open, "");

    ::dsn::apps::ttl_response resp;
1297 1298
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
    resp.server = _primary_address;

    rocksdb::Slice skey(key.data(), key.length());
    std::string value;
    rocksdb::Status status = _db->Get(_rd_opts, skey, &value);

    uint32_t expire_ts;
    uint32_t now_ts = ::pegasus::utils::epoch_now();
    if (status.ok()) {
        expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
        if (expire_ts > 0 && expire_ts <= now_ts) {
1310
            _pfc_recent_expire_count->increment();
Q
qinzuoyan 已提交
1311
            if (_verbose_log) {
1312 1313 1314
                derror("%s: rocksdb data expired for ttl from %s",
                       replica_name(),
                       reply.to_address().to_string());
Q
qinzuoyan 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323
            }
            status = rocksdb::Status::NotFound();
        }
    }

    if (!status.ok()) {
        if (_verbose_log) {
            ::dsn::blob hash_key, sort_key;
            pegasus_restore_key(key, hash_key, sort_key);
1324
            derror("%s: rocksdb get failed for ttl from %s: "
1325
                   "hash_key = \"%s\", sort_key = \"%s\", error = %s",
1326
                   replica_name(),
1327
                   reply.to_address().to_string(),
1328 1329
                   ::pegasus::utils::c_escape_string(hash_key).c_str(),
                   ::pegasus::utils::c_escape_string(sort_key).c_str(),
Q
qinzuoyan 已提交
1330 1331
                   status.ToString().c_str());
        } else if (!status.IsNotFound()) {
1332
            derror("%s: rocksdb get failed for ttl from %s: error = %s",
1333
                   replica_name(),
1334
                   reply.to_address().to_string(),
Q
qinzuoyan 已提交
1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
                   status.ToString().c_str());
        }
    }

    resp.error = status.code();
    if (status.ok()) {
        if (expire_ts > 0) {
            resp.ttl_seconds = expire_ts - now_ts;
        } else {
            // no ttl
            resp.ttl_seconds = -1;
        }
    }

    reply(resp);
}

DEFINE_TASK_CODE(LOCAL_PEGASUS_SERVER_DELAY, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
1353
void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request &request,
Q
qinzuoyan 已提交
1354 1355 1356
                                         ::dsn::rpc_replier<::dsn::apps::scan_response> &reply)
{
    dassert(_is_open, "");
1357
    _pfc_scan_qps->increment();
1358
    uint64_t start_time = dsn_now_ns();
Q
qinzuoyan 已提交
1359 1360

    ::dsn::apps::scan_response resp;
1361 1362
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1363 1364
    resp.server = _primary_address;

1365
    if (!is_filter_type_supported(request.hash_key_filter_type)) {
1366 1367
        derror("%s: invalid argument for get_scanner from %s: "
               "hash key filter type %d not supported",
1368
               replica_name(),
1369
               reply.to_address().to_string(),
1370 1371 1372 1373 1374 1375
               request.hash_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        reply(resp);
        return;
    }
    if (!is_filter_type_supported(request.sort_key_filter_type)) {
1376 1377
        derror("%s: invalid argument for get_scanner from %s: "
               "sort key filter type %d not supported",
1378
               replica_name(),
1379
               reply.to_address().to_string(),
1380 1381 1382 1383 1384 1385
               request.sort_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        reply(resp);
        return;
    }

1386 1387
    bool start_inclusive = request.start_inclusive;
    bool stop_inclusive = request.stop_inclusive;
1388 1389
    rocksdb::Slice start(request.start_key.data(), request.start_key.length());
    rocksdb::Slice stop(request.stop_key.data(), request.stop_key.length());
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409

    // limit key range by prefix filter
    // because data is not ordered by hash key (hash key "aa" is greater than "b"),
    // so we can only limit the start range by hash key filter.
    ::dsn::blob prefix_start_key;
    if (request.hash_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
        request.hash_key_filter_pattern.length() > 0) {
        pegasus_generate_key(prefix_start_key, request.hash_key_filter_pattern, ::dsn::blob());
        rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
        if (prefix_start.compare(start) > 0) {
            start = prefix_start;
            start_inclusive = true;
        }
    }

    // check if range is empty
    int c = start.compare(stop);
    if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
        // empty key range
        if (_verbose_log) {
1410
            dwarn("%s: empty key range for get_scanner from %s: "
1411
                  "start_key = \"%s\" (%s), stop_key = \"%s\" (%s)",
1412
                  replica_name(),
1413
                  reply.to_address().to_string(),
1414 1415 1416 1417 1418 1419
                  ::pegasus::utils::c_escape_string(request.start_key).c_str(),
                  request.start_inclusive ? "inclusive" : "exclusive",
                  ::pegasus::utils::c_escape_string(request.stop_key).c_str(),
                  request.stop_inclusive ? "inclusive" : "exclusive");
        }
        resp.error = rocksdb::Status::kOk;
1420
        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
1421 1422 1423
        reply(resp);
        return;
    }
Q
qinzuoyan 已提交
1424

1425
    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
Q
qinzuoyan 已提交
1426 1427
    it->Seek(start);
    bool complete = false;
1428
    bool first_exclusive = !start_inclusive;
Q
qinzuoyan 已提交
1429
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
1430
    uint64_t expire_count = 0;
1431 1432
    uint64_t filter_count = 0;
    int32_t count = 0;
1433
    resp.kvs.reserve(request.batch_size);
1434 1435
    while (count < request.batch_size && it->Valid()) {
        int c = it->key().compare(stop);
1436
        if (c > 0 || (c == 0 && !stop_inclusive)) {
1437
            // out of range
Q
qinzuoyan 已提交
1438 1439
            complete = true;
            break;
1440 1441 1442 1443 1444 1445 1446
        }

        if (first_exclusive) {
            first_exclusive = false;
            if (it->key().compare(start) == 0) {
                // discard start_sortkey
                it->Next();
Q
qinzuoyan 已提交
1447 1448 1449 1450
                continue;
            }
        }

1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
        int r = append_key_value_for_scan(resp.kvs,
                                          it->key(),
                                          it->value(),
                                          request.hash_key_filter_type,
                                          request.hash_key_filter_pattern,
                                          request.sort_key_filter_type,
                                          request.sort_key_filter_pattern,
                                          epoch_now,
                                          request.no_value);
        if (r == 1) {
            count++;
        } else if (r == 2) {
1463
            expire_count++;
1464 1465
        } else { // r == 3
            filter_count++;
1466
        }
1467 1468 1469 1470 1471 1472 1473 1474

        if (c == 0) {
            // seek to the last position
            complete = true;
            break;
        }

        it->Next();
1475
    }
Q
qinzuoyan 已提交
1476

1477 1478 1479 1480
    resp.error = it->status().code();
    if (!it->status().ok()) {
        // error occur
        if (_verbose_log) {
1481
            derror("%s: rocksdb scan failed for get_scanner from %s: "
1482
                   "start_key = \"%s\" (%s), stop_key = \"%s\" (%s), "
1483
                   "batch_size = %d, read_count = %d, error = %s",
1484
                   replica_name(),
1485
                   reply.to_address().to_string(),
1486
                   ::pegasus::utils::c_escape_string(start).c_str(),
1487
                   request.start_inclusive ? "inclusive" : "exclusive",
1488
                   ::pegasus::utils::c_escape_string(stop).c_str(),
1489 1490 1491 1492 1493
                   request.stop_inclusive ? "inclusive" : "exclusive",
                   request.batch_size,
                   count,
                   it->status().ToString().c_str());
        } else {
1494
            derror("%s: rocksdb scan failed for get_scanner from %s: error = %s",
1495
                   replica_name(),
1496
                   reply.to_address().to_string(),
1497 1498 1499 1500 1501
                   it->status().ToString().c_str());
        }
        resp.kvs.clear();
    } else if (it->Valid() && !complete) {
        // scan not completed
Q
qinzuoyan 已提交
1502 1503 1504
        std::unique_ptr<pegasus_scan_context> context(
            new pegasus_scan_context(std::move(it),
                                     std::string(stop.data(), stop.size()),
1505 1506 1507 1508 1509 1510 1511 1512 1513
                                     request.stop_inclusive,
                                     request.hash_key_filter_type,
                                     std::string(request.hash_key_filter_pattern.data(),
                                                 request.hash_key_filter_pattern.length()),
                                     request.sort_key_filter_type,
                                     std::string(request.sort_key_filter_pattern.data(),
                                                 request.sort_key_filter_pattern.length()),
                                     request.batch_size,
                                     request.no_value));
Q
qinzuoyan 已提交
1514 1515
        int64_t handle = _context_cache.put(std::move(context));
        resp.context_id = handle;
1516 1517
        // if the context is used, it will be fetched and re-put into cache,
        // which will change the handle,
Q
qinzuoyan 已提交
1518 1519 1520 1521 1522 1523
        // then the delayed task will fetch null context by old handle, and do nothing.
        ::dsn::tasking::enqueue(LOCAL_PEGASUS_SERVER_DELAY,
                                this,
                                [this, handle]() { _context_cache.fetch(handle); },
                                0,
                                std::chrono::minutes(5));
1524 1525 1526
    } else {
        // scan completed
        resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
Q
qinzuoyan 已提交
1527 1528
    }

1529
    if (expire_count > 0) {
1530
        _pfc_recent_expire_count->add(expire_count);
1531 1532
    }
    if (filter_count > 0) {
1533
        _pfc_recent_filter_count->add(filter_count);
1534 1535
    }

1536
    _pfc_scan_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
1537 1538 1539
    reply(resp);
}

1540
void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
Q
qinzuoyan 已提交
1541 1542 1543
                                  ::dsn::rpc_replier<::dsn::apps::scan_response> &reply)
{
    dassert(_is_open, "");
1544
    _pfc_scan_qps->increment();
Q
qinzuoyan 已提交
1545 1546 1547
    uint64_t start_time = dsn_now_ns();

    ::dsn::apps::scan_response resp;
1548 1549
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
Q
qinzuoyan 已提交
1550 1551
    resp.server = _primary_address;

1552
    std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
Q
qinzuoyan 已提交
1553 1554 1555 1556
    if (context) {
        rocksdb::Iterator *it = context->iterator.get();
        int32_t batch_size = context->batch_size;
        const rocksdb::Slice &stop = context->stop;
1557 1558 1559 1560 1561 1562
        bool stop_inclusive = context->stop_inclusive;
        ::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
        const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
        ::dsn::apps::filter_type::type sort_key_filter_type = context->hash_key_filter_type;
        const ::dsn::blob &sort_key_filter_pattern = context->hash_key_filter_pattern;
        bool no_value = context->no_value;
Q
qinzuoyan 已提交
1563 1564
        bool complete = false;
        uint32_t epoch_now = ::pegasus::utils::epoch_now();
1565
        uint64_t expire_count = 0;
1566 1567 1568 1569 1570 1571 1572
        uint64_t filter_count = 0;
        int32_t count = 0;

        while (count < batch_size && it->Valid()) {
            int c = it->key().compare(stop);
            if (c > 0 || (c == 0 && !stop_inclusive)) {
                // out of range
Q
qinzuoyan 已提交
1573 1574 1575
                complete = true;
                break;
            }
1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588

            int r = append_key_value_for_scan(resp.kvs,
                                              it->key(),
                                              it->value(),
                                              hash_key_filter_type,
                                              hash_key_filter_pattern,
                                              sort_key_filter_type,
                                              sort_key_filter_pattern,
                                              epoch_now,
                                              no_value);
            if (r == 1) {
                count++;
            } else if (r == 2) {
1589
                expire_count++;
1590 1591
            } else { // r == 3
                filter_count++;
1592
            }
1593 1594 1595 1596 1597 1598 1599 1600

            if (c == 0) {
                // seek to the last position
                complete = true;
                break;
            }

            it->Next();
1601
        }
1602 1603 1604 1605 1606

        resp.error = it->status().code();
        if (!it->status().ok()) {
            // error occur
            if (_verbose_log) {
1607
                derror("%s: rocksdb scan failed for scan from %s: "
1608
                       "context_id= %" PRId64 ", stop_key = \"%s\" (%s), "
1609
                       "batch_size = %d, read_count = %d, error = %s",
1610
                       replica_name(),
1611
                       reply.to_address().to_string(),
1612
                       request.context_id,
1613 1614
                       ::pegasus::utils::c_escape_string(stop).c_str(),
                       stop_inclusive ? "inclusive" : "exclusive",
1615 1616 1617 1618
                       batch_size,
                       count,
                       it->status().ToString().c_str());
            } else {
1619
                derror("%s: rocksdb scan failed for scan from %s: error = %s",
1620
                       replica_name(),
1621
                       reply.to_address().to_string(),
1622 1623 1624 1625 1626
                       it->status().ToString().c_str());
            }
            resp.kvs.clear();
        } else if (it->Valid() && !complete) {
            // scan not completed
Q
qinzuoyan 已提交
1627 1628 1629 1630 1631 1632 1633
            int64_t handle = _context_cache.put(std::move(context));
            resp.context_id = handle;
            ::dsn::tasking::enqueue(LOCAL_PEGASUS_SERVER_DELAY,
                                    this,
                                    [this, handle]() { _context_cache.fetch(handle); },
                                    0,
                                    std::chrono::minutes(5));
1634 1635 1636 1637 1638 1639
        } else {
            // scan completed
            resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
        }

        if (expire_count > 0) {
1640
            _pfc_recent_expire_count->add(expire_count);
1641 1642
        }
        if (filter_count > 0) {
1643
            _pfc_recent_filter_count->add(filter_count);
Q
qinzuoyan 已提交
1644 1645 1646 1647 1648
        }
    } else {
        resp.error = rocksdb::Status::Code::kNotFound;
    }

1649
    _pfc_scan_latency->set(dsn_now_ns() - start_time);
Q
qinzuoyan 已提交
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
    reply(resp);
}

void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }

DEFINE_TASK_CODE(UPDATING_ROCKSDB_SSTSIZE, TASK_PRIORITY_COMMON, THREAD_POOL_REPLICATION_LONG)

::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
    dassert(!_is_open, "");
1660
    ddebug("%s: start to open app %s", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
1661 1662 1663 1664 1665 1666 1667

    rocksdb::Options opts = _db_opts;
    opts.create_if_missing = true;
    opts.error_if_exists = false;
    opts.compaction_filter = &_key_ttl_compaction_filter;
    opts.default_value_schema_version = PEGASUS_VALUE_SCHEMA_MAX_VERSION;

1668 1669 1670
    // parse envs for parameters
    // envs is compounded in replication_app_base::open() function
    std::map<std::string, std::string> envs;
1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683
    if (argc > 0) {
        if (((argc - 1) % 2 != 0) || argv == nullptr) {
            derror("%s: parse envs failed, because invalid argc = %d or argv = nullptr",
                   replica_name(),
                   argc);
            return ::dsn::ERR_INVALID_PARAMETERS;
        }
        int idx = 1;
        while (idx < argc) {
            const char *key = argv[idx++];
            const char *value = argv[idx++];
            envs.emplace(key, value);
        }
1684
    }
C
cailiuyang 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698

    //
    // here, we must distinguish three cases, such as:
    //  case 1: we open the db that already exist
    //  case 2: we open a new db
    //  case 3: we restore the db base on old data
    //
    // if we want to restore the db base on old data, only all of the restore preconditions are
    // satisfied
    //      restore preconditions:
    //          1, rdb isn't exist
    //          2, we can parse restore info from app env, which is stored in argv
    //          3, restore_dir is exist
    //
1699
    auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
C
cailiuyang 已提交
1700 1701
    if (::dsn::utils::filesystem::path_exists(path)) {
        // only case 1
1702
        ddebug("%s: rdb is already exist, path = %s", replica_name(), path.c_str());
C
cailiuyang 已提交
1703
    } else {
1704
        std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
C
cailiuyang 已提交
1705 1706 1707 1708
        const std::string &restore_dir = restore_info.first;
        bool force_restore = restore_info.second;
        if (restore_dir.empty()) {
            // case 2
1709 1710
            if (force_restore) {
                derror("%s: try to restore, but we can't combine restore_dir from envs",
1711
                       replica_name());
1712 1713
                return ::dsn::ERR_FILE_OPERATION_FAILED;
            } else {
1714
                dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
1715
            }
C
cailiuyang 已提交
1716 1717
        } else {
            // case 3
1718
            ddebug("%s: try to restore from restore_dir = %s", replica_name(), restore_dir.c_str());
C
cailiuyang 已提交
1719 1720 1721 1722
            if (::dsn::utils::filesystem::directory_exists(restore_dir)) {
                // here, we just rename restore_dir to rdb, then continue the normal process
                if (::dsn::utils::filesystem::rename_path(restore_dir.c_str(), path.c_str())) {
                    ddebug("%s: rename restore_dir(%s) to rdb(%s) succeed",
1723
                           replica_name(),
C
cailiuyang 已提交
1724 1725 1726 1727
                           restore_dir.c_str(),
                           path.c_str());
                } else {
                    derror("%s: rename restore_dir(%s) to rdb(%s) failed",
1728
                           replica_name(),
C
cailiuyang 已提交
1729 1730 1731 1732 1733 1734 1735
                           restore_dir.c_str(),
                           path.c_str());
                    return ::dsn::ERR_FILE_OPERATION_FAILED;
                }
            } else {
                if (force_restore) {
                    derror("%s: try to restore, but restore_dir isn't exist, restore_dir = %s",
1736
                           replica_name(),
C
cailiuyang 已提交
1737 1738 1739 1740 1741 1742 1743
                           restore_dir.c_str());
                    return ::dsn::ERR_FILE_OPERATION_FAILED;
                } else {
                    dwarn(
                        "%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
                        "it, the role of this replica must not primary, so we open a new db on the "
                        "path(%s)",
1744
                        replica_name(),
C
cailiuyang 已提交
1745 1746 1747 1748 1749 1750 1751
                        restore_dir.c_str(),
                        path.c_str());
                }
            }
        }
    }

1752
    ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
C
cailiuyang 已提交
1753

Q
qinzuoyan 已提交
1754 1755
    auto status = rocksdb::DB::Open(opts, path, &_db);
    if (status.ok()) {
1756
        _last_committed_decree = _db->GetLastFlushedDecree();
Q
qinzuoyan 已提交
1757 1758 1759
        _value_schema_version = _db->GetValueSchemaVersion();
        if (_value_schema_version > PEGASUS_VALUE_SCHEMA_MAX_VERSION) {
            derror("%s: open app failed, unsupported value schema version %" PRIu32,
1760
                   replica_name(),
Q
qinzuoyan 已提交
1761 1762 1763 1764 1765 1766 1767 1768 1769 1770
                   _value_schema_version);
            delete _db;
            _db = nullptr;
            return ::dsn::ERR_LOCAL_APP_FAILURE;
        }

        // only enable filter after correct value_schema_version set
        _key_ttl_compaction_filter.SetValueSchemaVersion(_value_schema_version);
        _key_ttl_compaction_filter.EnableFilter();

1771 1772
        update_app_envs(envs);

Q
qinzuoyan 已提交
1773 1774
        parse_checkpoints();

1775 1776 1777 1778
        // checkpoint if necessary to make last_durable_decree() fresh.
        // only need async checkpoint because we sure that memtable is empty now.
        int64_t last_flushed = _db->GetLastFlushedDecree();
        if (last_flushed != last_durable_decree()) {
Q
qinzuoyan 已提交
1779 1780
            ddebug("%s: start to do async checkpoint, last_durable_decree = %" PRId64
                   ", last_flushed_decree = %" PRId64,
1781
                   replica_name(),
Q
qinzuoyan 已提交
1782
                   last_durable_decree(),
1783
                   last_flushed);
1784
            auto err = async_checkpoint(false);
Q
qinzuoyan 已提交
1785
            if (err != ::dsn::ERR_OK) {
1786 1787 1788 1789
                derror("%s: create checkpoint failed, error = %s", replica_name(), err.to_string());
                delete _db;
                _db = nullptr;
                return err;
Q
qinzuoyan 已提交
1790
            }
1791
            dassert(last_flushed == last_durable_decree(),
Q
qinzuoyan 已提交
1792
                    "last durable decree mismatch after checkpoint: %" PRId64 " vs %" PRId64,
1793
                    last_flushed,
Q
qinzuoyan 已提交
1794 1795 1796 1797 1798
                    last_durable_decree());
        }

        ddebug("%s: open app succeed, value_schema_version = %" PRIu32
               ", last_durable_decree = %" PRId64 "",
1799
               replica_name(),
Q
qinzuoyan 已提交
1800 1801 1802 1803 1804
               _value_schema_version,
               last_durable_decree());

        _is_open = true;

1805
        dinfo("%s: start the updating sstsize timer task", replica_name());
Q
qinzuoyan 已提交
1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816
        // using ::dsn::timer_task to updating the rocksdb sstsize.
        _updating_task = ::dsn::tasking::enqueue_timer(
            UPDATING_ROCKSDB_SSTSIZE,
            this,
            [this]() { this->updating_rocksdb_sstsize(); },
            std::chrono::seconds(_updating_rocksdb_sstsize_interval_seconds),
            0,
            std::chrono::seconds(30));

        return ::dsn::ERR_OK;
    } else {
1817
        derror("%s: open app failed, error = %s", replica_name(), status.ToString().c_str());
Q
qinzuoyan 已提交
1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }
}

::dsn::error_code pegasus_server_impl::stop(bool clear_state)
{
    if (!_is_open) {
        dassert(_db == nullptr, "");
        dassert(!clear_state, "should not be here if do clear");
        return ::dsn::ERR_OK;
    }

1830
    if (!clear_state) {
1831 1832
        auto status = _db->Flush(rocksdb::FlushOptions());
        if (!status.ok()) {
1833
            derror("%s: flush memtable on close failed: %s",
1834 1835
                   replica_name(),
                   status.ToString().c_str());
1836 1837 1838
        }
    }

Q
qinzuoyan 已提交
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855
    _context_cache.clear();

    // when stop the, should stop the timer_task.
    if (_updating_task != nullptr)
        _updating_task->cancel(true);

    _is_open = false;
    delete _db;
    _db = nullptr;

    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        _checkpoints.clear();
        set_last_durable_decree(0);
    }

    if (clear_state) {
1856 1857 1858
        if (!::dsn::utils::filesystem::remove_path(data_dir())) {
            derror(
                "%s: clear directory %s failed when stop app", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
1859 1860
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
1861 1862
        _pfc_sst_count->set(0);
        _pfc_sst_size->set(0);
Q
qinzuoyan 已提交
1863 1864
    }

1865 1866
    ddebug(
        "%s: close app succeed, clear_state = %s", replica_name(), clear_state ? "true" : "false");
Q
qinzuoyan 已提交
1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888
    return ::dsn::ERR_OK;
}

class CheckpointingTokenHelper
{
public:
    CheckpointingTokenHelper(std::atomic_bool &flag) : _flag(flag)
    {
        bool expected = false;
        _token_got = _flag.compare_exchange_strong(expected, true);
    }
    ~CheckpointingTokenHelper()
    {
        if (_token_got)
            _flag.store(false);
    }
    bool token_got() const { return _token_got; }
private:
    std::atomic_bool &_flag;
    bool _token_got;
};

1889
::dsn::error_code pegasus_server_impl::sync_checkpoint()
Q
qinzuoyan 已提交
1890 1891 1892 1893 1894
{
    CheckpointingTokenHelper token_helper(_is_checkpointing);
    if (!token_helper.token_got())
        return ::dsn::ERR_WRONG_TIMING;

1895
    int64_t last_durable = last_durable_decree();
1896
    int64_t last_commit = last_committed_decree();
1897 1898 1899 1900 1901 1902 1903 1904 1905
    dassert(last_durable <= last_commit, "%" PRId64 " VS %" PRId64, last_durable, last_commit);

    if (last_durable == last_commit) {
        ddebug("%s: no need to checkpoint because "
               "last_durable_decree = last_committed_decree = %" PRId64,
               replica_name(),
               last_durable);
        return ::dsn::ERR_OK;
    }
Q
qinzuoyan 已提交
1906 1907 1908 1909 1910

    rocksdb::Checkpoint *chkpt = nullptr;
    auto status = rocksdb::Checkpoint::Create(_db, &chkpt);
    if (!status.ok()) {
        derror("%s: create Checkpoint object failed, error = %s",
1911
               replica_name(),
Q
qinzuoyan 已提交
1912 1913 1914 1915 1916
               status.ToString().c_str());
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    auto dir = chkpt_get_dir_name(last_commit);
1917
    auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), dir);
Q
qinzuoyan 已提交
1918 1919
    if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
        ddebug("%s: checkpoint directory %s already exist, remove it first",
1920
               replica_name(),
Q
qinzuoyan 已提交
1921 1922
               chkpt_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
1923 1924
            derror(
                "%s: remove old checkpoint directory %s failed", replica_name(), chkpt_dir.c_str());
Q
qinzuoyan 已提交
1925 1926 1927 1928 1929 1930
            delete chkpt;
            chkpt = nullptr;
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }

1931 1932
    // CreateCheckpoint() will always flush memtable firstly.
    status = chkpt->CreateCheckpoint(chkpt_dir, 0);
Q
qinzuoyan 已提交
1933 1934 1935
    if (!status.ok()) {
        // sometimes checkpoint may fail, and try again will succeed
        derror("%s: create checkpoint failed, error = %s, try again",
1936
               replica_name(),
Q
qinzuoyan 已提交
1937
               status.ToString().c_str());
1938
        status = chkpt->CreateCheckpoint(chkpt_dir, 0);
Q
qinzuoyan 已提交
1939 1940 1941 1942 1943 1944 1945
    }

    // destroy Checkpoint object
    delete chkpt;
    chkpt = nullptr;

    if (!status.ok()) {
1946 1947
        derror(
            "%s: create checkpoint failed, error = %s", replica_name(), status.ToString().c_str());
Q
qinzuoyan 已提交
1948 1949 1950
        ::dsn::utils::filesystem::remove_path(chkpt_dir);
        if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
            derror("%s: remove damaged checkpoint directory %s failed",
1951
                   replica_name(),
Q
qinzuoyan 已提交
1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962
                   chkpt_dir.c_str());
        }
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
        dassert(last_commit > last_durable_decree(),
                "%" PRId64 " VS %" PRId64 "",
                last_commit,
                last_durable_decree());
1963 1964 1965 1966
        dassert(last_commit == _db->GetLastFlushedDecree(),
                "%" PRId64 " VS %" PRId64 "",
                last_commit,
                _db->GetLastFlushedDecree());
Q
qinzuoyan 已提交
1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977
        if (!_checkpoints.empty()) {
            dassert(last_commit > _checkpoints.back(),
                    "%" PRId64 " VS %" PRId64 "",
                    last_commit,
                    _checkpoints.back());
        }
        _checkpoints.push_back(last_commit);
        set_last_durable_decree(_checkpoints.back());
    }

    ddebug("%s: sync create checkpoint succeed, last_durable_decree = %" PRId64 "",
1978
           replica_name(),
Q
qinzuoyan 已提交
1979 1980 1981 1982 1983 1984 1985 1986
           last_durable_decree());

    gc_checkpoints();

    return ::dsn::ERR_OK;
}

// Must be thread safe.
1987
::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)
Q
qinzuoyan 已提交
1988 1989 1990 1991 1992
{
    CheckpointingTokenHelper token_helper(_is_checkpointing);
    if (!token_helper.token_got())
        return ::dsn::ERR_WRONG_TIMING;

1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009
    int64_t last_durable = last_durable_decree();
    int64_t last_flushed = static_cast<int64_t>(_db->GetLastFlushedDecree());
    int64_t last_commit = last_committed_decree();

    dassert(last_durable <= last_flushed, "%" PRId64 " VS %" PRId64, last_durable, last_flushed);
    dassert(last_flushed <= last_commit, "%" PRId64 " VS %" PRId64, last_flushed, last_commit);

    if (last_durable == last_commit) {
        ddebug("%s: no need to checkpoint because "
               "last_durable_decree = last_committed_decree = %" PRId64,
               replica_name(),
               last_durable);
        return ::dsn::ERR_OK;
    }

    if (last_durable == last_flushed) {
        if (flush_memtable) {
Q
qinzuoyan 已提交
2010 2011 2012 2013 2014
            // trigger flushing memtable, but not wait
            rocksdb::FlushOptions options;
            options.wait = false;
            auto status = _db->Flush(options);
            if (status.ok()) {
2015
                ddebug("%s: trigger flushing memtable succeed", replica_name());
Q
qinzuoyan 已提交
2016 2017 2018
                return ::dsn::ERR_TRY_AGAIN;
            } else {
                derror("%s: trigger flushing memtable failed, error = %s",
2019
                       replica_name(),
Q
qinzuoyan 已提交
2020 2021 2022 2023
                       status.ToString().c_str());
                return ::dsn::ERR_LOCAL_APP_FAILURE;
            }
        } else {
2024
            return ::dsn::ERR_OK;
Q
qinzuoyan 已提交
2025 2026 2027
        }
    }

2028
    dassert(last_durable < last_flushed, "%" PRId64 " VS %" PRId64, last_durable, last_flushed);
Q
qinzuoyan 已提交
2029 2030 2031

    char buf[256];
    sprintf(buf, "checkpoint.tmp.%" PRIu64 "", dsn_now_us());
2032
    std::string tmp_dir = ::dsn::utils::filesystem::path_combine(data_dir(), buf);
Q
qinzuoyan 已提交
2033 2034
    if (::dsn::utils::filesystem::directory_exists(tmp_dir)) {
        ddebug("%s: temporary checkpoint directory %s already exist, remove it first",
2035
               replica_name(),
Q
qinzuoyan 已提交
2036 2037 2038
               tmp_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
            derror("%s: remove temporary checkpoint directory %s failed",
2039
                   replica_name(),
Q
qinzuoyan 已提交
2040 2041 2042 2043
                   tmp_dir.c_str());
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }
2044

2045 2046 2047 2048
    int64_t checkpoint_decree = 0;
    ::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree);
    if (err != ::dsn::ERR_OK) {
        derror("%s: call copy_checkpoint_to_dir_unsafe failed with err = %s",
2049
               replica_name(),
2050 2051
               err.to_string());
        return ::dsn::ERR_LOCAL_APP_FAILURE;
Q
qinzuoyan 已提交
2052 2053
    }

2054
    auto chkpt_dir =
2055
        ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(checkpoint_decree));
Q
qinzuoyan 已提交
2056 2057
    if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
        ddebug("%s: checkpoint directory %s already exist, remove it first",
2058
               replica_name(),
Q
qinzuoyan 已提交
2059 2060
               chkpt_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
2061 2062
            derror(
                "%s: remove old checkpoint directory %s failed", replica_name(), chkpt_dir.c_str());
Q
qinzuoyan 已提交
2063 2064
            if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
                derror("%s: remove temporary checkpoint directory %s failed",
2065
                       replica_name(),
Q
qinzuoyan 已提交
2066 2067 2068 2069 2070 2071 2072 2073
                       tmp_dir.c_str());
            }
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }

    if (!::dsn::utils::filesystem::rename_path(tmp_dir, chkpt_dir)) {
        derror("%s: rename checkpoint directory from %s to %s failed",
2074
               replica_name(),
Q
qinzuoyan 已提交
2075 2076 2077 2078
               tmp_dir.c_str(),
               chkpt_dir.c_str());
        if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
            derror("%s: remove temporary checkpoint directory %s failed",
2079
                   replica_name(),
Q
qinzuoyan 已提交
2080 2081 2082 2083 2084 2085 2086
                   tmp_dir.c_str());
        }
        return ::dsn::ERR_FILE_OPERATION_FAILED;
    }

    {
        ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
2087 2088 2089 2090
        dassert(checkpoint_decree > last_durable_decree(),
                "%" PRId64 " VS %" PRId64 "",
                checkpoint_decree,
                last_durable_decree());
Q
qinzuoyan 已提交
2091
        if (!_checkpoints.empty()) {
2092 2093 2094 2095
            dassert(checkpoint_decree > _checkpoints.back(),
                    "%" PRId64 " VS %" PRId64 "",
                    checkpoint_decree,
                    _checkpoints.back());
Q
qinzuoyan 已提交
2096
        }
2097
        _checkpoints.push_back(checkpoint_decree);
Q
qinzuoyan 已提交
2098 2099 2100 2101
        set_last_durable_decree(_checkpoints.back());
    }

    ddebug("%s: async create checkpoint succeed, last_durable_decree = %" PRId64 "",
2102
           replica_name(),
Q
qinzuoyan 已提交
2103 2104 2105 2106 2107 2108 2109
           last_durable_decree());

    gc_checkpoints();

    return ::dsn::ERR_OK;
}

2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
// Must be thread safe.
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir,
                                                              /*output*/ int64_t *last_decree)
{
    CheckpointingTokenHelper token_helper(_is_checkpointing);
    if (!token_helper.token_got()) {
        return ::dsn::ERR_WRONG_TIMING;
    }

    return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree);
}

// not thread safe, should be protected by caller
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
                                                                     int64_t *checkpoint_decree)
{
    rocksdb::Checkpoint *chkpt = nullptr;
    rocksdb::Status status = rocksdb::Checkpoint::Create(_db, &chkpt);
    if (!status.ok()) {
        if (chkpt != nullptr)
            delete chkpt, chkpt = nullptr;
        derror("%s: create Checkpoint object failed, error = %s",
2132
               replica_name(),
2133 2134 2135 2136 2137 2138
               status.ToString().c_str());
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
        ddebug("%s: checkpoint directory %s is already exist, remove it first",
2139
               replica_name(),
2140 2141
               checkpoint_dir);
        if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
2142
            derror("%s: remove checkpoint directory %s failed", replica_name(), checkpoint_dir);
2143 2144 2145 2146 2147 2148 2149 2150 2151 2152
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }
    }

    uint64_t ci = 0;
    status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci);
    delete chkpt, chkpt = nullptr;

    if (!status.ok()) {
        derror("%s: async create checkpoint failed, error = %s",
2153
               replica_name(),
2154 2155
               status.ToString().c_str());
        if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
2156
            derror("%s: remove checkpoint directory %s failed", replica_name(), checkpoint_dir);
2157 2158 2159 2160 2161
        }
        return ::dsn::ERR_LOCAL_APP_FAILURE;
    }

    ddebug("%s: copy checkpoint to dir(%s) succeed, last_decree = %" PRId64 "",
2162
           replica_name(),
2163 2164 2165 2166 2167 2168 2169 2170 2171
           checkpoint_dir,
           ci);
    if (checkpoint_decree != nullptr) {
        *checkpoint_decree = static_cast<int64_t>(ci);
    }

    return ::dsn::ERR_OK;
}

Q
qinzuoyan 已提交
2172
::dsn::error_code pegasus_server_impl::get_checkpoint(int64_t learn_start,
2173 2174
                                                      const dsn::blob &learn_request,
                                                      dsn::replication::learn_state &state)
Q
qinzuoyan 已提交
2175 2176 2177 2178 2179
{
    dassert(_is_open, "");

    int64_t ci = last_durable_decree();
    if (ci == 0) {
2180
        derror("%s: no checkpoint found", replica_name());
Q
qinzuoyan 已提交
2181 2182 2183
        return ::dsn::ERR_OBJECT_NOT_FOUND;
    }

2184
    auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(ci));
Q
qinzuoyan 已提交
2185 2186
    state.files.clear();
    if (!::dsn::utils::filesystem::get_subfiles(chkpt_dir, state.files, true)) {
2187
        derror("%s: list files in checkpoint dir %s failed", replica_name(), chkpt_dir.c_str());
Q
qinzuoyan 已提交
2188 2189 2190 2191 2192 2193 2194
        return ::dsn::ERR_FILE_OPERATION_FAILED;
    }

    state.from_decree_excluded = 0;
    state.to_decree_included = ci;

    ddebug("%s: get checkpoint succeed, from_decree_excluded = 0, to_decree_included = %" PRId64 "",
2195
           replica_name(),
Q
qinzuoyan 已提交
2196 2197 2198 2199
           state.to_decree_included);
    return ::dsn::ERR_OK;
}

2200 2201 2202
::dsn::error_code
pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode,
                                              const dsn::replication::learn_state &state)
Q
qinzuoyan 已提交
2203 2204 2205 2206
{
    ::dsn::error_code err;
    int64_t ci = state.to_decree_included;

2207
    if (mode == chkpt_apply_mode::copy) {
Q
qinzuoyan 已提交
2208 2209 2210 2211 2212 2213
        dassert(ci > last_durable_decree(),
                "state.to_decree_included(%" PRId64 ") <= last_durable_decree(%" PRId64 ")",
                ci,
                last_durable_decree());

        auto learn_dir = ::dsn::utils::filesystem::remove_file_name(state.files[0]);
2214
        auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(ci));
Q
qinzuoyan 已提交
2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
        if (::dsn::utils::filesystem::rename_path(learn_dir, chkpt_dir)) {
            ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
            dassert(ci > last_durable_decree(),
                    "%" PRId64 " VS %" PRId64 "",
                    ci,
                    last_durable_decree());
            _checkpoints.push_back(ci);
            if (!_checkpoints.empty()) {
                dassert(ci > _checkpoints.back(),
                        "%" PRId64 " VS %" PRId64 "",
                        ci,
                        _checkpoints.back());
            }
            set_last_durable_decree(ci);
            err = ::dsn::ERR_OK;
        } else {
            derror("%s: rename directory %s to %s failed",
2232
                   replica_name(),
Q
qinzuoyan 已提交
2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243
                   learn_dir.c_str(),
                   chkpt_dir.c_str());
            err = ::dsn::ERR_FILE_OPERATION_FAILED;
        }

        return err;
    }

    if (_is_open) {
        err = stop(true);
        if (err != ::dsn::ERR_OK) {
2244
            derror("%s: close rocksdb %s failed, error = %s", replica_name(), err.to_string());
Q
qinzuoyan 已提交
2245 2246 2247 2248 2249
            return err;
        }
    }

    // clear data dir
2250 2251
    if (!::dsn::utils::filesystem::remove_path(data_dir())) {
        derror("%s: clear data directory %s failed", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
2252 2253 2254 2255
        return ::dsn::ERR_FILE_OPERATION_FAILED;
    }

    // reopen the db with the new checkpoint files
2256
    if (state.files.size() > 0) {
Q
qinzuoyan 已提交
2257
        // create data dir
2258 2259
        if (!::dsn::utils::filesystem::create_directory(data_dir())) {
            derror("%s: create data directory %s failed", replica_name(), data_dir().c_str());
Q
qinzuoyan 已提交
2260 2261 2262 2263 2264
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }

        // move learned files from learn_dir to data_dir/rdb
        std::string learn_dir = ::dsn::utils::filesystem::remove_file_name(state.files[0]);
2265
        std::string new_dir = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
Q
qinzuoyan 已提交
2266 2267
        if (!::dsn::utils::filesystem::rename_path(learn_dir, new_dir)) {
            derror("%s: rename directory %s to %s failed",
2268
                   replica_name(),
Q
qinzuoyan 已提交
2269 2270 2271 2272 2273 2274 2275
                   learn_dir.c_str(),
                   new_dir.c_str());
            return ::dsn::ERR_FILE_OPERATION_FAILED;
        }

        err = start(0, nullptr);
    } else {
2276
        ddebug("%s: apply empty checkpoint, create new rocksdb", replica_name());
Q
qinzuoyan 已提交
2277 2278 2279 2280
        err = start(0, nullptr);
    }

    if (err != ::dsn::ERR_OK) {
2281
        derror("%s: open rocksdb failed, error = %s", replica_name(), err.to_string());
Q
qinzuoyan 已提交
2282 2283 2284 2285 2286 2287 2288
        return err;
    }

    dassert(_is_open, "");
    dassert(ci == last_durable_decree(), "%" PRId64 " VS %" PRId64 "", ci, last_durable_decree());

    ddebug("%s: apply checkpoint succeed, last_durable_decree = %" PRId64,
2289
           replica_name(),
Q
qinzuoyan 已提交
2290 2291 2292 2293
           last_durable_decree());
    return ::dsn::ERR_OK;
}

2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347
bool pegasus_server_impl::is_filter_type_supported(::dsn::apps::filter_type::type filter_type)
{
    return filter_type >= ::dsn::apps::filter_type::FT_NO_FILTER &&
           filter_type <= ::dsn::apps::filter_type::FT_MATCH_POSTFIX;
}

bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type,
                                          const ::dsn::blob &filter_pattern,
                                          const ::dsn::blob &value)
{
    if (filter_type == ::dsn::apps::filter_type::FT_NO_FILTER || filter_pattern.length() == 0)
        return true;
    if (value.length() < filter_pattern.length())
        return false;
    switch (filter_type) {
    case ::dsn::apps::filter_type::FT_MATCH_ANYWHERE: {
        // brute force search
        // TODO: improve it according to
        //   http://old.blog.phusion.nl/2010/12/06/efficient-substring-searching/
        const char *a1 = value.data();
        int l1 = value.length();
        const char *a2 = filter_pattern.data();
        int l2 = filter_pattern.length();
        for (int i = 0; i <= l1 - l2; ++i) {
            int j = 0;
            while (j < l2 && a1[i + j] == a2[j])
                ++j;
            if (j == l2)
                return true;
        }
        return false;
    }
    case ::dsn::apps::filter_type::FT_MATCH_PREFIX:
        return (memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0);
    case ::dsn::apps::filter_type::FT_MATCH_POSTFIX:
        return (memcmp(value.data() + value.length() - filter_pattern.length(),
                       filter_pattern.data(),
                       filter_pattern.length()) == 0);
    default:
        dassert(false, "unsupported filter type: %d", filter_type);
    }
    return false;
}

int pegasus_server_impl::append_key_value_for_scan(
    std::vector<::dsn::apps::key_value> &kvs,
    const rocksdb::Slice &key,
    const rocksdb::Slice &value,
    ::dsn::apps::filter_type::type hash_key_filter_type,
    const ::dsn::blob &hash_key_filter_pattern,
    ::dsn::apps::filter_type::type sort_key_filter_type,
    const ::dsn::blob &sort_key_filter_pattern,
    uint32_t epoch_now,
    bool no_value)
Q
qinzuoyan 已提交
2348 2349 2350 2351
{
    uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
    if (expire_ts > 0 && expire_ts <= epoch_now) {
        if (_verbose_log) {
2352
            derror("%s: rocksdb data expired for scan", replica_name());
Q
qinzuoyan 已提交
2353
        }
2354
        return 2;
Q
qinzuoyan 已提交
2355 2356 2357 2358 2359
    }

    ::dsn::apps::key_value kv;

    // extract raw key
2360 2361 2362 2363 2364 2365 2366 2367
    ::dsn::blob raw_key(key.data(), 0, key.size());
    if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
        sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER) {
        ::dsn::blob hash_key, sort_key;
        pegasus_restore_key(raw_key, hash_key, sort_key);
        if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
            !validate_filter(hash_key_filter_type, hash_key_filter_pattern, hash_key)) {
            if (_verbose_log) {
2368
                derror("%s: hash key filtered for scan", replica_name());
2369 2370 2371 2372 2373 2374
            }
            return 3;
        }
        if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
            !validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
            if (_verbose_log) {
2375
                derror("%s: sort key filtered for scan", replica_name());
2376 2377 2378 2379
            }
            return 3;
        }
    }
2380
    std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
2381 2382
    ::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
    kv.key.assign(std::move(key_buf), 0, raw_key.length());
Q
qinzuoyan 已提交
2383 2384

    // extract value
2385 2386 2387 2388
    if (!no_value) {
        std::unique_ptr<std::string> value_buf(new std::string(value.data(), value.size()));
        pegasus_extract_user_data(_value_schema_version, std::move(value_buf), kv.value);
    }
Q
qinzuoyan 已提交
2389

2390
    kvs.emplace_back(std::move(kv));
2391
    return 1;
Q
qinzuoyan 已提交
2392 2393
}

2394 2395 2396 2397 2398 2399 2400 2401
int pegasus_server_impl::append_key_value_for_multi_get(
    std::vector<::dsn::apps::key_value> &kvs,
    const rocksdb::Slice &key,
    const rocksdb::Slice &value,
    ::dsn::apps::filter_type::type sort_key_filter_type,
    const ::dsn::blob &sort_key_filter_pattern,
    uint32_t epoch_now,
    bool no_value)
Q
qinzuoyan 已提交
2402 2403 2404 2405
{
    uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, value);
    if (expire_ts > 0 && expire_ts <= epoch_now) {
        if (_verbose_log) {
2406
            derror("%s: rocksdb data expired for multi get", replica_name());
Q
qinzuoyan 已提交
2407
        }
2408
        return 2;
Q
qinzuoyan 已提交
2409 2410 2411 2412 2413 2414 2415 2416
    }

    ::dsn::apps::key_value kv;

    // extract sort_key
    ::dsn::blob raw_key(key.data(), 0, key.size());
    ::dsn::blob hash_key, sort_key;
    pegasus_restore_key(raw_key, hash_key, sort_key);
2417 2418 2419
    if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
        !validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
        if (_verbose_log) {
2420
            derror("%s: sort key filtered for multi get", replica_name());
2421 2422 2423
        }
        return 3;
    }
2424
    std::shared_ptr<char> sort_key_buf(::dsn::utils::make_shared_array<char>(sort_key.length()));
Q
qinzuoyan 已提交
2425 2426 2427 2428 2429 2430 2431 2432 2433
    ::memcpy(sort_key_buf.get(), sort_key.data(), sort_key.length());
    kv.key.assign(std::move(sort_key_buf), 0, sort_key.length());

    // extract value
    if (!no_value) {
        std::unique_ptr<std::string> value_buf(new std::string(value.data(), value.size()));
        pegasus_extract_user_data(_value_schema_version, std::move(value_buf), kv.value);
    }

2434
    kvs.emplace_back(std::move(kv));
2435
    return 1;
Q
qinzuoyan 已提交
2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465
}

// statistic the count and size of files of this type. return (-1,-1) if failed.
static std::pair<int64_t, int64_t> get_type_file_size(const std::string &path,
                                                      const std::string &type)
{
    std::vector<std::string> files;
    if (!::dsn::utils::filesystem::get_subfiles(path, files, false)) {
        dwarn("get subfiles of dir %s failed", path.c_str());
        return std::pair<int64_t, int64_t>(-1, -1);
    }
    int64_t res = 0;
    int64_t cnt = 0;
    for (auto &f : files) {
        if (f.length() > type.length() && f.substr(f.length() - type.length()) == type) {
            int64_t tsize = 0;
            if (::dsn::utils::filesystem::file_size(f, tsize)) {
                res += tsize;
                cnt++;
            } else {
                dwarn("get size of file %s failed", f.c_str());
                return std::pair<int64_t, int64_t>(-1, -1);
            }
        }
    }
    return std::pair<int64_t, int64_t>(cnt, res);
}

std::pair<int64_t, int64_t> pegasus_server_impl::statistic_sst_size()
{
2466 2467
    // dir = data_dir()/rdb
    return get_type_file_size(::dsn::utils::filesystem::path_combine(data_dir(), "rdb"), ".sst");
Q
qinzuoyan 已提交
2468 2469 2470 2471 2472 2473
}

void pegasus_server_impl::updating_rocksdb_sstsize()
{
    std::pair<int64_t, int64_t> sst_size = statistic_sst_size();
    if (sst_size.first == -1) {
2474
        dwarn("%s: statistic sst file size failed", replica_name());
Q
qinzuoyan 已提交
2475 2476 2477 2478
    } else {
        int64_t sst_size_mb = sst_size.second / 1048576;
        ddebug("%s: statistic sst file size succeed, sst_count = %" PRId64 ", sst_size = %" PRId64
               "(%" PRId64 "MB)",
2479
               replica_name(),
Q
qinzuoyan 已提交
2480 2481 2482
               sst_size.first,
               sst_size.second,
               sst_size_mb);
2483 2484
        _pfc_sst_count->set(sst_size.first);
        _pfc_sst_size->set(sst_size_mb);
Q
qinzuoyan 已提交
2485 2486
    }
}
C
cailiuyang 已提交
2487

2488 2489
std::pair<std::string, bool>
pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::string> &env_kvs)
C
cailiuyang 已提交
2490 2491
{
    std::pair<std::string, bool> res;
2492 2493 2494
    std::stringstream os;
    os << "restore.";

2495
    auto it = env_kvs.find(ROCKSDB_ENV_RESTORE_FORCE_RESTORE);
C
cailiuyang 已提交
2496
    if (it != env_kvs.end()) {
2497
        ddebug("%s: found %s in envs", replica_name(), ROCKSDB_ENV_RESTORE_FORCE_RESTORE.c_str());
C
cailiuyang 已提交
2498 2499 2500
        res.second = true;
    }

2501
    it = env_kvs.find(ROCKSDB_ENV_RESTORE_POLICY_NAME);
C
cailiuyang 已提交
2502
    if (it != env_kvs.end()) {
2503 2504 2505 2506
        ddebug("%s: found %s in envs: %s",
               replica_name(),
               ROCKSDB_ENV_RESTORE_POLICY_NAME.c_str(),
               it->second.c_str());
C
cailiuyang 已提交
2507 2508 2509 2510
        os << it->second << ".";
    } else {
        return res;
    }
2511

2512
    it = env_kvs.find(ROCKSDB_ENV_RESTORE_BACKUP_ID);
C
cailiuyang 已提交
2513
    if (it != env_kvs.end()) {
2514 2515 2516 2517
        ddebug("%s: found %s in envs: %s",
               replica_name(),
               ROCKSDB_ENV_RESTORE_BACKUP_ID.c_str(),
               it->second.c_str());
C
cailiuyang 已提交
2518 2519 2520 2521
        os << it->second;
    } else {
        return res;
    }
2522

2523
    std::string parent_dir = ::dsn::utils::filesystem::remove_file_name(data_dir());
C
cailiuyang 已提交
2524 2525 2526
    res.first = ::dsn::utils::filesystem::path_combine(parent_dir, os.str());
    return res;
}
A
acelyc111 已提交
2527

2528 2529 2530 2531
// args:
//   ROCKSDB_ENV_MANUAL_COMPACT_TARGET_LEVEL_KEY (default -1)
//   ROCKSDB_ENV_BOTTOMMOST_LEVEL_COMPACTION_KEY (default skip)
void pegasus_server_impl::manual_compact(const std::map<std::string, std::string> &opts)
A
acelyc111 已提交
2532
{
2533
    uint64_t start_time;
2534 2535 2536 2537
    rocksdb::Status status;

    // wait flush before compact to make all data compacted.
    ddebug("%s: start to Flush", replica_name());
2538
    start_time = dsn_now_ms();
2539
    status = _db->Flush(rocksdb::FlushOptions());
2540 2541 2542 2543
    ddebug("%s: Flush finished, status = %s, time_used = %" PRIu64 "ms",
           replica_name(),
           status.ToString().c_str(),
           dsn_now_ms() - start_time);
2544

A
acelyc111 已提交
2545 2546 2547 2548
    rocksdb::CompactRangeOptions options;
    options.exclusive_manual_compaction = true;
    options.change_level = true;
    options.target_level = -1;
2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586
    auto find1 = opts.find(ROCKSDB_MANUAL_COMPACT_TARGET_LEVEL_KEY);
    if (find1 != opts.end()) {
        const std::string &argv = find1->second;
        int target_level;
        if (pegasus::utils::buf2int(argv.c_str(), argv.size(), target_level) && target_level >= 1 &&
            target_level <= _db_opts.num_levels) {
            options.target_level = target_level;
        } else {
            derror("%s: invalid option value [%s] for %s, use default value [%d]",
                   replica_name(),
                   argv.c_str(),
                   ROCKSDB_MANUAL_COMPACT_TARGET_LEVEL_KEY.c_str(),
                   -1);
        }
    }
    options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kSkip;
    auto find2 = opts.find(ROCKSDB_MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY);
    if (find2 != opts.end()) {
        const std::string &argv = find2->second;
        if (argv == ROCKSDB_MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) {
            options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;
        } else if (argv == ROCKSDB_MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP) {
            options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kSkip;
        } else {
            derror("%s: invalid option value [%s] for %s, use default value [%s]",
                   replica_name(),
                   argv.c_str(),
                   ROCKSDB_MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY.c_str(),
                   ROCKSDB_MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP.c_str());
        }
    }
    ddebug("%s: start to CompactRange, target_level = %d, bottommost_level_compaction = %s",
           replica_name(),
           options.target_level,
           options.bottommost_level_compaction == rocksdb::BottommostLevelCompaction::kForce
               ? "force"
               : "skip");
    start_time = dsn_now_ms();
2587
    status = _db->CompactRange(options, nullptr, nullptr);
2588 2589 2590 2591
    ddebug("%s: CompactRange finished, status = %s, time_used = %" PRIu64 "ms",
           replica_name(),
           status.ToString().c_str(),
           dsn_now_ms() - start_time);
2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710
}

void pegasus_server_impl::update_app_envs(const std::map<std::string, std::string> &envs)
{
    // update usage scenario
    // if not specified, default is normal
    auto find = envs.find(ROCKSDB_ENV_USAGE_SCENARIO_KEY);
    std::string new_usage_scenario =
        (find != envs.end() ? find->second : ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
    if (new_usage_scenario != _usage_scenario) {
        std::string old_usage_scenario = _usage_scenario;
        if (set_usage_scenario(new_usage_scenario)) {
            ddebug("%s: update app env[%s] from %s to %s succeed",
                   replica_name(),
                   ROCKSDB_ENV_USAGE_SCENARIO_KEY.c_str(),
                   old_usage_scenario.c_str(),
                   new_usage_scenario.c_str());
        } else {
            derror("%s: update app env[%s] from %s to %s failed",
                   replica_name(),
                   ROCKSDB_ENV_USAGE_SCENARIO_KEY.c_str(),
                   old_usage_scenario.c_str(),
                   new_usage_scenario.c_str());
        }
    }
}

void pegasus_server_impl::query_app_envs(/*out*/std::map<std::string, std::string> &envs)
{
    envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario;
}

bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
{
    if (usage_scenario == _usage_scenario)
        return false;
    std::unordered_map<std::string, std::string> new_options;
    if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL ||
        usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) {
        if (_usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
            // old usage scenario is bulk load, reset first
            new_options["level0_file_num_compaction_trigger"] =
                boost::lexical_cast<std::string>(_db_opts.level0_file_num_compaction_trigger);
            new_options["level0_slowdown_writes_trigger"] =
                boost::lexical_cast<std::string>(_db_opts.level0_slowdown_writes_trigger);
            new_options["level0_stop_writes_trigger"] =
                boost::lexical_cast<std::string>(_db_opts.level0_stop_writes_trigger);
            new_options["soft_pending_compaction_bytes_limit"] =
                boost::lexical_cast<std::string>(_db_opts.soft_pending_compaction_bytes_limit);
            new_options["hard_pending_compaction_bytes_limit"] =
                boost::lexical_cast<std::string>(_db_opts.hard_pending_compaction_bytes_limit);
            new_options["disable_auto_compactions"] = "false";
            new_options["max_compaction_bytes"] =
                boost::lexical_cast<std::string>(_db_opts.max_compaction_bytes);
            new_options["write_buffer_size"] =
                boost::lexical_cast<std::string>(_db_opts.write_buffer_size);
            new_options["max_write_buffer_number"] =
                boost::lexical_cast<std::string>(_db_opts.max_write_buffer_number);
        }
        if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
            new_options["level0_file_num_compaction_trigger"] =
                boost::lexical_cast<std::string>(_db_opts.level0_file_num_compaction_trigger);
        } else {
            new_options["level0_file_num_compaction_trigger"] =
                boost::lexical_cast<std::string>(_db_opts.level0_file_num_compaction_trigger * 2);
        }
    } else if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
        // refer to Options::PrepareForBulkLoad()
        new_options["level0_file_num_compaction_trigger"] = "1000000000";
        new_options["level0_slowdown_writes_trigger"] = "1000000000";
        new_options["level0_stop_writes_trigger"] = "1000000000";
        new_options["soft_pending_compaction_bytes_limit"] = "0";
        new_options["hard_pending_compaction_bytes_limit"] = "0";
        new_options["disable_auto_compactions"] = "true";
        new_options["max_compaction_bytes"] =
            boost::lexical_cast<std::string>(static_cast<uint64_t>(1) << 60);
        new_options["write_buffer_size"] = boost::lexical_cast<std::string>(
            std::max(_db_opts.write_buffer_size, (size_t)(256 * 1024 * 1024)));
        new_options["max_write_buffer_number"] =
            boost::lexical_cast<std::string>(std::max(_db_opts.max_write_buffer_number, 6));
    } else {
        derror("%s: invalid usage scenario: %s", replica_name(), usage_scenario.c_str());
        return false;
    }
    if (set_options(new_options)) {
        _usage_scenario = usage_scenario;
        ddebug("%s: set usage scenario to %s succeed", replica_name(), usage_scenario.c_str());
        return true;
    } else {
        derror("%s: set usage scenario to %s failed", replica_name(), usage_scenario.c_str());
        return false;
    }
}

bool pegasus_server_impl::set_options(
    const std::unordered_map<std::string, std::string> &new_options)
{
    std::ostringstream oss;
    int i = 0;
    for (auto &kv : new_options) {
        if (i > 0)
            oss << ",";
        oss << kv.first << "=" << kv.second;
        i++;
    }
    rocksdb::Status status = _db->SetOptions(new_options);
    if (status == rocksdb::Status::OK()) {
        ddebug("%s: rocksdb set options returns %s: {%s}",
               replica_name(),
               status.ToString().c_str(),
               oss.str().c_str());
        return true;
    } else {
        derror("%s: rocksdb set options returns %s: {%s}",
               replica_name(),
               status.ToString().c_str(),
               oss.str().c_str());
        return false;
    }
A
acelyc111 已提交
2711
}
Q
qinzuoyan 已提交
2712 2713
}
} // namespace