pegasus_write_service.cpp 12.7 KB
Newer Older
1 2 3 4
// Copyright (c) 2017, Xiaomi, Inc.  All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

5
#include "base/pegasus_rpc_types.h"
6 7
#include "pegasus_write_service.h"
#include "pegasus_write_service_impl.h"
8
#include "capacity_unit_calculator.h"
9

10 11
#include <dsn/cpp/message_utils.h>

12 13 14 15
namespace pegasus {
namespace server {

pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
16 17 18 19
    : _server(server),
      _impl(new impl(server)),
      _batch_start_time(0),
      _cu_calculator(server->_cu_calculator.get())
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
{
    std::string str_gpid = fmt::format("{}", server->get_gpid());

    std::string name;

    name = fmt::format("put_qps@{}", str_gpid);
    _pfc_put_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request");

    name = fmt::format("multi_put_qps@{}", str_gpid);
    _pfc_multi_put_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");

    name = fmt::format("remove_qps@{}", str_gpid);
    _pfc_remove_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");

    name = fmt::format("multi_remove_qps@{}", str_gpid);
    _pfc_multi_remove_qps.init_app_counter("app.pegasus",
                                           name.c_str(),
                                           COUNTER_TYPE_RATE,
                                           "statistic the qps of MULTI_REMOVE request");

Q
QinZuoyan 已提交
43 44 45 46
    name = fmt::format("incr_qps@{}", str_gpid);
    _pfc_incr_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request");

Q
QinZuoyan 已提交
47 48 49 50 51 52
    name = fmt::format("check_and_set_qps@{}", str_gpid);
    _pfc_check_and_set_qps.init_app_counter("app.pegasus",
                                            name.c_str(),
                                            COUNTER_TYPE_RATE,
                                            "statistic the qps of CHECK_AND_SET request");

53 54 55 56 57 58
    name = fmt::format("check_and_mutate_qps@{}", str_gpid);
    _pfc_check_and_mutate_qps.init_app_counter("app.pegasus",
                                               name.c_str(),
                                               COUNTER_TYPE_RATE,
                                               "statistic the qps of CHECK_AND_MUTATE request");

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    name = fmt::format("put_latency@{}", str_gpid);
    _pfc_put_latency.init_app_counter("app.pegasus",
                                      name.c_str(),
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of PUT request");

    name = fmt::format("multi_put_latency@{}", str_gpid);
    _pfc_multi_put_latency.init_app_counter("app.pegasus",
                                            name.c_str(),
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_PUT request");

    name = fmt::format("remove_latency@{}", str_gpid);
    _pfc_remove_latency.init_app_counter("app.pegasus",
                                         name.c_str(),
                                         COUNTER_TYPE_NUMBER_PERCENTILES,
                                         "statistic the latency of REMOVE request");

    name = fmt::format("multi_remove_latency@{}", str_gpid);
    _pfc_multi_remove_latency.init_app_counter("app.pegasus",
                                               name.c_str(),
                                               COUNTER_TYPE_NUMBER_PERCENTILES,
                                               "statistic the latency of MULTI_REMOVE request");
Q
QinZuoyan 已提交
82 83 84 85 86 87

    name = fmt::format("incr_latency@{}", str_gpid);
    _pfc_incr_latency.init_app_counter("app.pegasus",
                                       name.c_str(),
                                       COUNTER_TYPE_NUMBER_PERCENTILES,
                                       "statistic the latency of INCR request");
Q
QinZuoyan 已提交
88 89 90 91 92 93

    name = fmt::format("check_and_set_latency@{}", str_gpid);
    _pfc_check_and_set_latency.init_app_counter("app.pegasus",
                                                name.c_str(),
                                                COUNTER_TYPE_NUMBER_PERCENTILES,
                                                "statistic the latency of CHECK_AND_SET request");
94 95 96 97 98 99 100

    name = fmt::format("check_and_mutate_latency@{}", str_gpid);
    _pfc_check_and_mutate_latency.init_app_counter(
        "app.pegasus",
        name.c_str(),
        COUNTER_TYPE_NUMBER_PERCENTILES,
        "statistic the latency of CHECK_AND_MUTATE request");
101 102 103 104 105

    _pfc_duplicate_qps.init_app_counter("app.pegasus",
                                        fmt::format("duplicate_qps@{}", str_gpid).c_str(),
                                        COUNTER_TYPE_RATE,
                                        "statistic the qps of DUPLICATE requests");
106 107
}

108 109 110
pegasus_write_service::~pegasus_write_service() {}

int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); }
111

112
int pegasus_write_service::multi_put(const db_write_context &ctx,
W
Wu Tao 已提交
113 114
                                     const dsn::apps::multi_put_request &update,
                                     dsn::apps::update_response &resp)
115 116 117
{
    uint64_t start_time = dsn_now_ns();
    _pfc_multi_put_qps->increment();
118
    int err = _impl->multi_put(ctx, update, resp);
119 120 121 122 123

    if (_server->is_primary()) {
        _cu_calculator->add_multi_put_cu(resp.error, update.kvs);
    }

124
    _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
W
Wu Tao 已提交
125
    return err;
126 127
}

W
Wu Tao 已提交
128 129 130
int pegasus_write_service::multi_remove(int64_t decree,
                                        const dsn::apps::multi_remove_request &update,
                                        dsn::apps::multi_remove_response &resp)
131 132 133
{
    uint64_t start_time = dsn_now_ns();
    _pfc_multi_remove_qps->increment();
W
Wu Tao 已提交
134
    int err = _impl->multi_remove(decree, update, resp);
135 136 137 138 139

    if (_server->is_primary()) {
        _cu_calculator->add_multi_remove_cu(resp.error, update.sort_keys);
    }

140
    _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
W
Wu Tao 已提交
141
    return err;
142 143
}

Q
QinZuoyan 已提交
144 145 146 147 148 149 150
int pegasus_write_service::incr(int64_t decree,
                                const dsn::apps::incr_request &update,
                                dsn::apps::incr_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_incr_qps->increment();
    int err = _impl->incr(decree, update, resp);
151 152 153 154 155

    if (_server->is_primary()) {
        _cu_calculator->add_incr_cu(resp.error);
    }

Q
QinZuoyan 已提交
156 157 158 159
    _pfc_incr_latency->set(dsn_now_ns() - start_time);
    return err;
}

Q
QinZuoyan 已提交
160 161 162 163 164 165 166
int pegasus_write_service::check_and_set(int64_t decree,
                                         const dsn::apps::check_and_set_request &update,
                                         dsn::apps::check_and_set_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_check_and_set_qps->increment();
    int err = _impl->check_and_set(decree, update, resp);
167 168 169 170 171

    if (_server->is_primary()) {
        _cu_calculator->add_check_and_set_cu(resp.error, update.set_sort_key, update.set_value);
    }

Q
QinZuoyan 已提交
172 173 174 175
    _pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
    return err;
}

176 177 178 179 180 181 182
int pegasus_write_service::check_and_mutate(int64_t decree,
                                            const dsn::apps::check_and_mutate_request &update,
                                            dsn::apps::check_and_mutate_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_check_and_mutate_qps->increment();
    int err = _impl->check_and_mutate(decree, update, resp);
183 184 185 186 187

    if (_server->is_primary()) {
        _cu_calculator->add_check_and_mutate_cu(resp.error, update.mutate_list);
    }

188 189 190 191
    _pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time);
    return err;
}

192
void pegasus_write_service::batch_prepare(int64_t decree)
193
{
194 195
    dassert(_batch_start_time == 0,
            "batch_prepare and batch_commit/batch_abort must be called in pair");
196

197
    _batch_start_time = dsn_now_ns();
198 199
}

200
int pegasus_write_service::batch_put(const db_write_context &ctx,
201 202
                                     const dsn::apps::update_request &update,
                                     dsn::apps::update_response &resp)
203
{
204 205 206 207
    dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare");

    _batch_qps_perfcounters.push_back(_pfc_put_qps.get());
    _batch_latency_perfcounters.push_back(_pfc_put_latency.get());
208
    int err = _impl->batch_put(ctx, update, resp);
209

210 211 212 213 214
    if (_server->is_primary()) {
        _cu_calculator->add_put_cu(resp.error, update.key, update.value);
    }

    return err;
215 216
}

217 218 219
int pegasus_write_service::batch_remove(int64_t decree,
                                        const dsn::blob &key,
                                        dsn::apps::update_response &resp)
220
{
221
    dassert(_batch_start_time != 0, "batch_remove must be called after batch_prepare");
222

223 224
    _batch_qps_perfcounters.push_back(_pfc_remove_qps.get());
    _batch_latency_perfcounters.push_back(_pfc_remove_latency.get());
225 226 227 228 229
    int err = _impl->batch_remove(decree, key, resp);

    if (_server->is_primary()) {
        _cu_calculator->add_remove_cu(resp.error, key);
    }
230

231
    return err;
232
}
233

234 235 236
int pegasus_write_service::batch_commit(int64_t decree)
{
    dassert(_batch_start_time != 0, "batch_commit must be called after batch_prepare");
237

238 239 240
    int err = _impl->batch_commit(decree);
    clear_up_batch_states();
    return err;
241 242
}

243
void pegasus_write_service::batch_abort(int64_t decree, int err)
244
{
245 246
    dassert(_batch_start_time != 0, "batch_abort must be called after batch_prepare");
    dassert(err, "must abort on non-zero err");
247

248 249
    _impl->batch_abort(decree, err);
    clear_up_batch_states();
250 251
}

252
void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_ttl(ttl); }
253

254
void pegasus_write_service::clear_up_batch_states()
255
{
256 257 258 259 260 261 262 263 264
    uint64_t latency = dsn_now_ns() - _batch_start_time;
    for (dsn::perf_counter *pfc : _batch_qps_perfcounters)
        pfc->increment();
    for (dsn::perf_counter *pfc : _batch_latency_perfcounters)
        pfc->set(latency);

    _batch_qps_perfcounters.clear();
    _batch_latency_perfcounters.clear();
    _batch_start_time = 0;
265 266
}

267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
int pegasus_write_service::duplicate(int64_t decree,
                                     const dsn::apps::duplicate_request &request,
                                     dsn::apps::duplicate_response &resp)
{
    // Verifies the cluster_id.
    if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
        resp.__set_error(rocksdb::Status::kInvalidArgument);
        resp.__set_error_hint("request cluster id is unconfigured");
        return empty_put(decree);
    }
    if (request.cluster_id == get_current_cluster_id()) {
        resp.__set_error(rocksdb::Status::kInvalidArgument);
        resp.__set_error_hint("self-duplicating");
        return empty_put(decree);
    }

    _pfc_duplicate_qps->increment();
    dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code, request.raw_message);
    bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
                     request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE;
    auto remote_timetag = generate_timetag(request.timestamp, request.cluster_id, is_delete);
    auto ctx = db_write_context::create_duplicate(decree, remote_timetag, request.verify_timetag);

    if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
        multi_put_rpc rpc(write);
        resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response()));
        return resp.error;
    }
    if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
        multi_remove_rpc rpc(write);
        resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), rpc.response()));
        return resp.error;
    }
300 301
    put_rpc put;
    remove_rpc remove;
302 303 304 305
    if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
        request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
        int err = 0;
        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
306 307
            put = put_rpc(write);
            err = _impl->batch_put(ctx, put.request(), put.response());
308 309
        }
        if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
310 311
            remove = remove_rpc(write);
            err = _impl->batch_remove(ctx.decree, remove.request(), remove.response());
312 313 314 315 316 317 318 319 320 321 322 323 324 325
        }
        if (!err) {
            err = _impl->batch_commit(ctx.decree);
        } else {
            _impl->batch_abort(ctx.decree, err);
        }
        resp.__set_error(err);
        return resp.error;
    }
    resp.__set_error(rocksdb::Status::kInvalidArgument);
    resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code));
    return empty_put(ctx.decree);
}

326 327
} // namespace server
} // namespace pegasus