pegasus_write_service.cpp 9.8 KB
Newer Older
1 2 3 4 5 6
// Copyright (c) 2017, Xiaomi, Inc.  All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "pegasus_write_service.h"
#include "pegasus_write_service_impl.h"
7
#include "capacity_unit_calculator.h"
8 9 10 11 12

namespace pegasus {
namespace server {

pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
13 14 15 16
    : _server(server),
      _impl(new impl(server)),
      _batch_start_time(0),
      _cu_calculator(server->_cu_calculator.get())
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
{
    std::string str_gpid = fmt::format("{}", server->get_gpid());

    std::string name;

    name = fmt::format("put_qps@{}", str_gpid);
    _pfc_put_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request");

    name = fmt::format("multi_put_qps@{}", str_gpid);
    _pfc_multi_put_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");

    name = fmt::format("remove_qps@{}", str_gpid);
    _pfc_remove_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");

    name = fmt::format("multi_remove_qps@{}", str_gpid);
    _pfc_multi_remove_qps.init_app_counter("app.pegasus",
                                           name.c_str(),
                                           COUNTER_TYPE_RATE,
                                           "statistic the qps of MULTI_REMOVE request");

Q
QinZuoyan 已提交
40 41 42 43
    name = fmt::format("incr_qps@{}", str_gpid);
    _pfc_incr_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request");

Q
QinZuoyan 已提交
44 45 46 47 48 49
    name = fmt::format("check_and_set_qps@{}", str_gpid);
    _pfc_check_and_set_qps.init_app_counter("app.pegasus",
                                            name.c_str(),
                                            COUNTER_TYPE_RATE,
                                            "statistic the qps of CHECK_AND_SET request");

50 51 52 53 54 55
    name = fmt::format("check_and_mutate_qps@{}", str_gpid);
    _pfc_check_and_mutate_qps.init_app_counter("app.pegasus",
                                               name.c_str(),
                                               COUNTER_TYPE_RATE,
                                               "statistic the qps of CHECK_AND_MUTATE request");

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
    name = fmt::format("put_latency@{}", str_gpid);
    _pfc_put_latency.init_app_counter("app.pegasus",
                                      name.c_str(),
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of PUT request");

    name = fmt::format("multi_put_latency@{}", str_gpid);
    _pfc_multi_put_latency.init_app_counter("app.pegasus",
                                            name.c_str(),
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_PUT request");

    name = fmt::format("remove_latency@{}", str_gpid);
    _pfc_remove_latency.init_app_counter("app.pegasus",
                                         name.c_str(),
                                         COUNTER_TYPE_NUMBER_PERCENTILES,
                                         "statistic the latency of REMOVE request");

    name = fmt::format("multi_remove_latency@{}", str_gpid);
    _pfc_multi_remove_latency.init_app_counter("app.pegasus",
                                               name.c_str(),
                                               COUNTER_TYPE_NUMBER_PERCENTILES,
                                               "statistic the latency of MULTI_REMOVE request");
Q
QinZuoyan 已提交
79 80 81 82 83 84

    name = fmt::format("incr_latency@{}", str_gpid);
    _pfc_incr_latency.init_app_counter("app.pegasus",
                                       name.c_str(),
                                       COUNTER_TYPE_NUMBER_PERCENTILES,
                                       "statistic the latency of INCR request");
Q
QinZuoyan 已提交
85 86 87 88 89 90

    name = fmt::format("check_and_set_latency@{}", str_gpid);
    _pfc_check_and_set_latency.init_app_counter("app.pegasus",
                                                name.c_str(),
                                                COUNTER_TYPE_NUMBER_PERCENTILES,
                                                "statistic the latency of CHECK_AND_SET request");
91 92 93 94 95 96 97

    name = fmt::format("check_and_mutate_latency@{}", str_gpid);
    _pfc_check_and_mutate_latency.init_app_counter(
        "app.pegasus",
        name.c_str(),
        COUNTER_TYPE_NUMBER_PERCENTILES,
        "statistic the latency of CHECK_AND_MUTATE request");
98 99
}

100 101 102
pegasus_write_service::~pegasus_write_service() {}

int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); }
103

W
Wu Tao 已提交
104 105 106
int pegasus_write_service::multi_put(int64_t decree,
                                     const dsn::apps::multi_put_request &update,
                                     dsn::apps::update_response &resp)
107 108 109
{
    uint64_t start_time = dsn_now_ns();
    _pfc_multi_put_qps->increment();
W
Wu Tao 已提交
110
    int err = _impl->multi_put(decree, update, resp);
111 112 113 114 115

    if (_server->is_primary()) {
        _cu_calculator->add_multi_put_cu(resp.error, update.kvs);
    }

116
    _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
W
Wu Tao 已提交
117
    return err;
118 119
}

W
Wu Tao 已提交
120 121 122
int pegasus_write_service::multi_remove(int64_t decree,
                                        const dsn::apps::multi_remove_request &update,
                                        dsn::apps::multi_remove_response &resp)
123 124 125
{
    uint64_t start_time = dsn_now_ns();
    _pfc_multi_remove_qps->increment();
W
Wu Tao 已提交
126
    int err = _impl->multi_remove(decree, update, resp);
127 128 129 130 131

    if (_server->is_primary()) {
        _cu_calculator->add_multi_remove_cu(resp.error, update.sort_keys);
    }

132
    _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
W
Wu Tao 已提交
133
    return err;
134 135
}

Q
QinZuoyan 已提交
136 137 138 139 140 141 142
int pegasus_write_service::incr(int64_t decree,
                                const dsn::apps::incr_request &update,
                                dsn::apps::incr_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_incr_qps->increment();
    int err = _impl->incr(decree, update, resp);
143 144 145 146 147

    if (_server->is_primary()) {
        _cu_calculator->add_incr_cu(resp.error);
    }

Q
QinZuoyan 已提交
148 149 150 151
    _pfc_incr_latency->set(dsn_now_ns() - start_time);
    return err;
}

Q
QinZuoyan 已提交
152 153 154 155 156 157 158
int pegasus_write_service::check_and_set(int64_t decree,
                                         const dsn::apps::check_and_set_request &update,
                                         dsn::apps::check_and_set_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_check_and_set_qps->increment();
    int err = _impl->check_and_set(decree, update, resp);
159 160 161 162 163

    if (_server->is_primary()) {
        _cu_calculator->add_check_and_set_cu(resp.error, update.set_sort_key, update.set_value);
    }

Q
QinZuoyan 已提交
164 165 166 167
    _pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
    return err;
}

168 169 170 171 172 173 174
int pegasus_write_service::check_and_mutate(int64_t decree,
                                            const dsn::apps::check_and_mutate_request &update,
                                            dsn::apps::check_and_mutate_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_check_and_mutate_qps->increment();
    int err = _impl->check_and_mutate(decree, update, resp);
175 176 177 178 179

    if (_server->is_primary()) {
        _cu_calculator->add_check_and_mutate_cu(resp.error, update.mutate_list);
    }

180 181 182 183
    _pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time);
    return err;
}

184
void pegasus_write_service::batch_prepare(int64_t decree)
185
{
186 187
    dassert(_batch_start_time == 0,
            "batch_prepare and batch_commit/batch_abort must be called in pair");
188

189
    _batch_start_time = dsn_now_ns();
190 191
}

192 193 194
int pegasus_write_service::batch_put(int64_t decree,
                                     const dsn::apps::update_request &update,
                                     dsn::apps::update_response &resp)
195
{
196 197 198 199
    dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare");

    _batch_qps_perfcounters.push_back(_pfc_put_qps.get());
    _batch_latency_perfcounters.push_back(_pfc_put_latency.get());
200
    int err = _impl->batch_put(decree, update, resp);
201

202 203 204 205 206
    if (_server->is_primary()) {
        _cu_calculator->add_put_cu(resp.error, update.key, update.value);
    }

    return err;
207 208
}

209 210 211
int pegasus_write_service::batch_remove(int64_t decree,
                                        const dsn::blob &key,
                                        dsn::apps::update_response &resp)
212
{
213
    dassert(_batch_start_time != 0, "batch_remove must be called after batch_prepare");
214

215 216
    _batch_qps_perfcounters.push_back(_pfc_remove_qps.get());
    _batch_latency_perfcounters.push_back(_pfc_remove_latency.get());
217 218 219 220 221
    int err = _impl->batch_remove(decree, key, resp);

    if (_server->is_primary()) {
        _cu_calculator->add_remove_cu(resp.error, key);
    }
222

223
    return err;
224
}
225

226 227 228
int pegasus_write_service::batch_commit(int64_t decree)
{
    dassert(_batch_start_time != 0, "batch_commit must be called after batch_prepare");
229

230 231 232
    int err = _impl->batch_commit(decree);
    clear_up_batch_states();
    return err;
233 234
}

235
void pegasus_write_service::batch_abort(int64_t decree, int err)
236
{
237 238
    dassert(_batch_start_time != 0, "batch_abort must be called after batch_prepare");
    dassert(err, "must abort on non-zero err");
239

240 241
    _impl->batch_abort(decree, err);
    clear_up_batch_states();
242 243
}

244
void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_ttl(ttl); }
245

246
void pegasus_write_service::clear_up_batch_states()
247
{
248 249 250 251 252 253 254 255 256
    uint64_t latency = dsn_now_ns() - _batch_start_time;
    for (dsn::perf_counter *pfc : _batch_qps_perfcounters)
        pfc->increment();
    for (dsn::perf_counter *pfc : _batch_latency_perfcounters)
        pfc->set(latency);

    _batch_qps_perfcounters.clear();
    _batch_latency_perfcounters.clear();
    _batch_start_time = 0;
257 258 259 260
}

} // namespace server
} // namespace pegasus