pegasus_write_service.cpp 7.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
// Copyright (c) 2017, Xiaomi, Inc.  All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "pegasus_write_service.h"
#include "pegasus_write_service_impl.h"

namespace pegasus {
namespace server {

pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
    : _impl(new impl(server)), _batch_start_time(0)
{
    std::string str_gpid = fmt::format("{}", server->get_gpid());

    std::string name;

    name = fmt::format("put_qps@{}", str_gpid);
    _pfc_put_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request");

    name = fmt::format("multi_put_qps@{}", str_gpid);
    _pfc_multi_put_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");

    name = fmt::format("remove_qps@{}", str_gpid);
    _pfc_remove_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");

    name = fmt::format("multi_remove_qps@{}", str_gpid);
    _pfc_multi_remove_qps.init_app_counter("app.pegasus",
                                           name.c_str(),
                                           COUNTER_TYPE_RATE,
                                           "statistic the qps of MULTI_REMOVE request");

Q
QinZuoyan 已提交
36 37 38 39
    name = fmt::format("incr_qps@{}", str_gpid);
    _pfc_incr_qps.init_app_counter(
        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request");

Q
QinZuoyan 已提交
40 41 42 43 44 45
    name = fmt::format("check_and_set_qps@{}", str_gpid);
    _pfc_check_and_set_qps.init_app_counter("app.pegasus",
                                            name.c_str(),
                                            COUNTER_TYPE_RATE,
                                            "statistic the qps of CHECK_AND_SET request");

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
    name = fmt::format("put_latency@{}", str_gpid);
    _pfc_put_latency.init_app_counter("app.pegasus",
                                      name.c_str(),
                                      COUNTER_TYPE_NUMBER_PERCENTILES,
                                      "statistic the latency of PUT request");

    name = fmt::format("multi_put_latency@{}", str_gpid);
    _pfc_multi_put_latency.init_app_counter("app.pegasus",
                                            name.c_str(),
                                            COUNTER_TYPE_NUMBER_PERCENTILES,
                                            "statistic the latency of MULTI_PUT request");

    name = fmt::format("remove_latency@{}", str_gpid);
    _pfc_remove_latency.init_app_counter("app.pegasus",
                                         name.c_str(),
                                         COUNTER_TYPE_NUMBER_PERCENTILES,
                                         "statistic the latency of REMOVE request");

    name = fmt::format("multi_remove_latency@{}", str_gpid);
    _pfc_multi_remove_latency.init_app_counter("app.pegasus",
                                               name.c_str(),
                                               COUNTER_TYPE_NUMBER_PERCENTILES,
                                               "statistic the latency of MULTI_REMOVE request");
Q
QinZuoyan 已提交
69 70 71 72 73 74

    name = fmt::format("incr_latency@{}", str_gpid);
    _pfc_incr_latency.init_app_counter("app.pegasus",
                                       name.c_str(),
                                       COUNTER_TYPE_NUMBER_PERCENTILES,
                                       "statistic the latency of INCR request");
Q
QinZuoyan 已提交
75 76 77 78 79 80

    name = fmt::format("check_and_set_latency@{}", str_gpid);
    _pfc_check_and_set_latency.init_app_counter("app.pegasus",
                                                name.c_str(),
                                                COUNTER_TYPE_NUMBER_PERCENTILES,
                                                "statistic the latency of CHECK_AND_SET request");
81 82
}

83 84 85
pegasus_write_service::~pegasus_write_service() {}

int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); }
86

W
Wu Tao 已提交
87 88 89
int pegasus_write_service::multi_put(int64_t decree,
                                     const dsn::apps::multi_put_request &update,
                                     dsn::apps::update_response &resp)
90 91 92
{
    uint64_t start_time = dsn_now_ns();
    _pfc_multi_put_qps->increment();
W
Wu Tao 已提交
93
    int err = _impl->multi_put(decree, update, resp);
94
    _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
W
Wu Tao 已提交
95
    return err;
96 97
}

W
Wu Tao 已提交
98 99 100
int pegasus_write_service::multi_remove(int64_t decree,
                                        const dsn::apps::multi_remove_request &update,
                                        dsn::apps::multi_remove_response &resp)
101 102 103
{
    uint64_t start_time = dsn_now_ns();
    _pfc_multi_remove_qps->increment();
W
Wu Tao 已提交
104
    int err = _impl->multi_remove(decree, update, resp);
105
    _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
W
Wu Tao 已提交
106
    return err;
107 108
}

Q
QinZuoyan 已提交
109 110 111 112 113 114 115 116 117 118 119
int pegasus_write_service::incr(int64_t decree,
                                const dsn::apps::incr_request &update,
                                dsn::apps::incr_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_incr_qps->increment();
    int err = _impl->incr(decree, update, resp);
    _pfc_incr_latency->set(dsn_now_ns() - start_time);
    return err;
}

Q
QinZuoyan 已提交
120 121 122 123 124 125 126 127 128 129 130
int pegasus_write_service::check_and_set(int64_t decree,
                                         const dsn::apps::check_and_set_request &update,
                                         dsn::apps::check_and_set_response &resp)
{
    uint64_t start_time = dsn_now_ns();
    _pfc_check_and_set_qps->increment();
    int err = _impl->check_and_set(decree, update, resp);
    _pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
    return err;
}

131
void pegasus_write_service::batch_prepare(int64_t decree)
132
{
133 134
    dassert(_batch_start_time == 0,
            "batch_prepare and batch_commit/batch_abort must be called in pair");
135

136
    _batch_start_time = dsn_now_ns();
137 138
}

139 140 141
int pegasus_write_service::batch_put(int64_t decree,
                                     const dsn::apps::update_request &update,
                                     dsn::apps::update_response &resp)
142
{
143 144 145 146
    dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare");

    _batch_qps_perfcounters.push_back(_pfc_put_qps.get());
    _batch_latency_perfcounters.push_back(_pfc_put_latency.get());
147

148
    return _impl->batch_put(decree, update, resp);
149 150
}

151 152 153
int pegasus_write_service::batch_remove(int64_t decree,
                                        const dsn::blob &key,
                                        dsn::apps::update_response &resp)
154
{
155
    dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare");
156

157 158
    _batch_qps_perfcounters.push_back(_pfc_remove_qps.get());
    _batch_latency_perfcounters.push_back(_pfc_remove_latency.get());
159

160 161
    return _impl->batch_remove(decree, key, resp);
}
162

163 164 165
int pegasus_write_service::batch_commit(int64_t decree)
{
    dassert(_batch_start_time != 0, "batch_commit must be called after batch_prepare");
166

167 168 169
    int err = _impl->batch_commit(decree);
    clear_up_batch_states();
    return err;
170 171
}

172
void pegasus_write_service::batch_abort(int64_t decree, int err)
173
{
174 175
    dassert(_batch_start_time != 0, "batch_abort must be called after batch_prepare");
    dassert(err, "must abort on non-zero err");
176

177 178
    _impl->batch_abort(decree, err);
    clear_up_batch_states();
179 180
}

181
void pegasus_write_service::clear_up_batch_states()
182
{
183 184 185 186 187 188 189 190 191
    uint64_t latency = dsn_now_ns() - _batch_start_time;
    for (dsn::perf_counter *pfc : _batch_qps_perfcounters)
        pfc->increment();
    for (dsn::perf_counter *pfc : _batch_latency_perfcounters)
        pfc->set(latency);

    _batch_qps_perfcounters.clear();
    _batch_latency_perfcounters.clear();
    _batch_start_time = 0;
192 193 194 195
}

} // namespace server
} // namespace pegasus