// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. #include #include #include #include #include #include #include #include #include #include "pegasus_client_impl.h" using namespace ::dsn; namespace pegasus { namespace client { #define ROCSKDB_ERROR_START -1000 std::unordered_map pegasus_client_impl::_client_error_to_string; std::unordered_map pegasus_client_impl::_server_error_to_client; pegasus_client_impl::pegasus_client_impl(const char *cluster_name, const char *app_name) : _cluster_name(cluster_name), _app_name(app_name) { _server_uri = "dsn://" + _cluster_name + "/" + _app_name; _server_uri_address.assign_uri(_server_uri.c_str()); _client = new ::dsn::apps::rrdb_client(_server_uri_address); std::string section = "uri-resolver.dsn://" + _cluster_name; std::string server_list = dsn_config_get_value_string(section.c_str(), "arguments", "", ""); std::vector lv; ::dsn::utils::split_args(server_list.c_str(), lv, ','); std::vector meta_servers; for (auto &s : lv) { ::dsn::rpc_address addr; if (!addr.from_string_ipv4(s.c_str())) { dassert(false, "invalid address '%s' specified in config [%s].arguments", s.c_str(), section.c_str()); } meta_servers.push_back(addr); } dassert(meta_servers.size() > 0, "no meta server specified in config [%s].arguments", section.c_str()); _meta_server.assign_group("meta-servers"); for (auto &ms : meta_servers) { _meta_server.group_address()->add(ms); } } pegasus_client_impl::~pegasus_client_impl() { delete _client; } const char *pegasus_client_impl::get_cluster_name() const { return _cluster_name.c_str(); } const char *pegasus_client_impl::get_app_name() const { return _app_name.c_str(); } int pegasus_client_impl::set(const std::string &hash_key, const std::string &sort_key, const std::string &value, int timeout_milliseconds, int ttl_seconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, internal_info &&_info) { ret = err; if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_set(hash_key, sort_key, value, std::move(callback), timeout_milliseconds, ttl_seconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_set(const std::string &hash_key, const std::string &sort_key, const std::string &value, async_set_callback_t &&callback, int timeout_milliseconds, int ttl_seconds) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, internal_info()); return; } ::dsn::apps::update_request req; pegasus_generate_key(req.key, hash_key, sort_key); req.value.assign(value.c_str(), 0, value.size()); if (ttl_seconds == 0) req.expire_ts_seconds = 0; else req.expire_ts_seconds = ttl_seconds + utils::epoch_now(); auto partition_hash = pegasus_key_hash(req.key); // wrap the user defined callback function, generate a new callback function. auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } internal_info info; ::dsn::apps::update_response response; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; } auto ret = get_client_error( (err == ::dsn::ERR_OK) ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(info)); }; _client->put(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::multi_set(const std::string &hash_key, const std::map &kvs, int timeout_milliseconds, int ttl_seconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, internal_info &&_info) { ret = err; if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_multi_set(hash_key, kvs, std::move(callback), timeout_milliseconds, ttl_seconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_multi_set(const std::string &hash_key, const std::map &kvs, async_multi_set_callback_t &&callback, int timeout_milliseconds, int ttl_seconds) { // check params if (hash_key.size() == 0) { derror("invalid hash key: hash key should not be empty for multi_set"); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, internal_info()); return; } if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, internal_info()); return; } if (kvs.empty()) { derror("invalid kvs: kvs should not be empty"); if (callback != nullptr) callback(PERR_INVALID_VALUE, internal_info()); return; } ::dsn::apps::multi_put_request req; req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size()); for (auto &kv : kvs) { ::dsn::apps::key_value kv_blob; kv_blob.key = ::dsn::blob(kv.first.data(), 0, kv.first.size()); kv_blob.value = ::dsn::blob(kv.second.data(), 0, kv.second.size()); req.kvs.emplace_back(std::move(kv_blob)); } if (ttl_seconds == 0) req.expire_ts_seconds = 0; else req.expire_ts_seconds = ttl_seconds + utils::epoch_now(); ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); // wrap the user-defined-callback-function, generate a new callback function. auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } internal_info info; ::dsn::apps::update_response response; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; } auto ret = get_client_error( (err == ::dsn::ERR_OK) ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(info)); }; _client->multi_put(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::get(const std::string &hash_key, const std::string &sort_key, std::string &value, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::string &&str, internal_info &&_info) { ret = err; value = std::move(str); if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_get(hash_key, sort_key, std::move(callback), timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_get(const std::string &hash_key, const std::string &sort_key, async_get_callback_t &&callback, int timeout_milliseconds) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::string(), internal_info()); return; } ::dsn::blob req; pegasus_generate_key(req, hash_key, sort_key); auto partition_hash = pegasus_key_hash(req); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } std::string value; internal_info info; dsn::apps::read_response response; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); if (response.error == 0) { value.assign(response.value.data(), response.value.length()); } info.app_id = response.app_id; info.partition_index = response.partition_index; info.server = response.server; } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(value), std::move(info)); }; _client->get(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::multi_get(const std::string &hash_key, const std::set &sort_keys, std::map &values, int max_fetch_count, int max_fetch_size, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::map &&_values, internal_info &&_info) { ret = err; if (info != nullptr) (*info) = std::move(_info); values = std::move(_values); op_completed.notify(); }; async_multi_get(hash_key, sort_keys, std::move(callback), max_fetch_count, max_fetch_size, timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_multi_get(const std::string &hash_key, const std::set &sort_keys, async_multi_get_callback_t &&callback, int max_fetch_count, int max_fetch_size, int timeout_milliseconds) { // check params if (hash_key.size() == 0) { derror("invalid hash key: hash key should not be empty"); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::map(), internal_info()); return; } if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::map(), internal_info()); return; } ::dsn::apps::multi_get_request req; req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size()); req.max_kv_count = max_fetch_count; req.max_kv_size = max_fetch_size; req.start_inclusive = true; req.stop_inclusive = false; for (auto &sort_key : sort_keys) { req.sort_keys.emplace_back(sort_key.data(), 0, sort_key.size()); } ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } std::map values; internal_info info; ::dsn::apps::multi_get_response response; if (err == ::dsn::ERR_OK) { ::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.server = response.server; for (auto &kv : response.kvs) values.emplace(std::string(kv.key.data(), kv.key.length()), std::string(kv.value.data(), kv.value.length())); } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(values), std::move(info)); }; _client->multi_get(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::multi_get(const std::string &hash_key, const std::string &start_sortkey, const std::string &stop_sortkey, const multi_get_options &options, std::map &values, int max_fetch_count, int max_fetch_size, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::map &&_values, internal_info &&_info) { ret = err; if (info != nullptr) (*info) = std::move(_info); values = std::move(_values); op_completed.notify(); }; async_multi_get(hash_key, start_sortkey, stop_sortkey, options, std::move(callback), max_fetch_count, max_fetch_size, timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_multi_get(const std::string &hash_key, const std::string &start_sortkey, const std::string &stop_sortkey, const multi_get_options &options, async_multi_get_callback_t &&callback, int max_fetch_count, int max_fetch_size, int timeout_milliseconds) { // check params if (hash_key.size() == 0) { derror("invalid hash key: hash key should not be empty"); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::map(), internal_info()); return; } if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::map(), internal_info()); return; } ::dsn::apps::multi_get_request req; req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size()); req.start_sortkey = ::dsn::blob(start_sortkey.data(), 0, start_sortkey.size()); req.stop_sortkey = ::dsn::blob(stop_sortkey.data(), 0, stop_sortkey.size()); req.start_inclusive = options.start_inclusive; req.stop_inclusive = options.stop_inclusive; req.max_kv_count = max_fetch_count; req.max_kv_size = max_fetch_size; req.no_value = options.no_value; req.reverse = options.reverse; req.sort_key_filter_type = (dsn::apps::filter_type::type)options.sort_key_filter_type; req.sort_key_filter_pattern = ::dsn::blob( options.sort_key_filter_pattern.data(), 0, options.sort_key_filter_pattern.size()); ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } std::map values; internal_info info; ::dsn::apps::multi_get_response response; if (err == ::dsn::ERR_OK) { ::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.server = response.server; for (auto &kv : response.kvs) values.emplace(std::string(kv.key.data(), kv.key.length()), std::string(kv.value.data(), kv.value.length())); } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(values), std::move(info)); }; _client->multi_get(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::multi_get_sortkeys(const std::string &hash_key, std::set &sort_keys, int max_fetch_count, int max_fetch_size, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::set &&_sort_keys, internal_info &&_info) { ret = err; if (info != nullptr) (*info) = std::move(_info); sort_keys = std::move(_sort_keys); op_completed.notify(); }; async_multi_get_sortkeys( hash_key, std::move(callback), max_fetch_count, max_fetch_size, timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_multi_get_sortkeys(const std::string &hash_key, async_multi_get_sortkeys_callback_t &&callback, int max_fetch_count, int max_fetch_size, int timeout_milliseconds) { // check params if (hash_key.size() == 0) { derror("invalid hash key: hash key should not be empty for multi_get_sortkeys"); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::set(), internal_info()); return; } if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, std::set(), internal_info()); return; } ::dsn::apps::multi_get_request req; req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size()); req.max_kv_count = max_fetch_count; req.max_kv_size = max_fetch_size; req.no_value = true; ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } std::set sort_keys; internal_info info; ::dsn::apps::multi_get_response response; if (err == ::dsn::ERR_OK) { ::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.server = response.server; for (auto &kv : response.kvs) sort_keys.insert(std::string(kv.key.data(), kv.key.length())); } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(sort_keys), std::move(info)); }; _client->multi_get(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::exist(const std::string &hash_key, const std::string &sort_key, int timeout_milliseconds, internal_info *info) { int ttl_seconds; return ttl(hash_key, sort_key, ttl_seconds, timeout_milliseconds, info); } int pegasus_client_impl::sortkey_count(const std::string &hash_key, int64_t &count, int timeout_milliseconds, internal_info *info) { // check params if (hash_key.size() == 0) { derror("invalid hash key: hash key should not be empty for sortkey_count"); return PERR_INVALID_HASH_KEY; } if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); return PERR_INVALID_HASH_KEY; } ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, hash_key, std::string()); auto partition_hash = pegasus_key_hash(tmp_key); auto pr = _client->sortkey_count_sync(::dsn::blob(hash_key.data(), 0, hash_key.length()), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); if (pr.first == ERR_OK && pr.second.error == 0) { count = pr.second.count; } if (info != nullptr) { if (pr.first == ERR_OK) { info->app_id = pr.second.app_id; info->partition_index = pr.second.partition_index; info->decree = -1; info->server = pr.second.server; } else { info->app_id = -1; info->partition_index = -1; info->decree = -1; } } return get_client_error(pr.first == ERR_OK ? get_rocksdb_server_error(pr.second.error) : int(pr.first)); } int pegasus_client_impl::del(const std::string &hash_key, const std::string &sort_key, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, internal_info &&_info) { ret = err; if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_del(hash_key, sort_key, std::move(callback), timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_del(const std::string &hash_key, const std::string &sort_key, async_del_callback_t &&callback, int timeout_milliseconds) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, internal_info()); return; } ::dsn::blob req; pegasus_generate_key(req, hash_key, sort_key); auto partition_hash = pegasus_key_hash(req); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } ::dsn::apps::update_response response; internal_info info; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(info)); }; _client->remove(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::multi_del(const std::string &hash_key, const std::set &sort_keys, int64_t &deleted_count, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, int64_t _deleted_count, internal_info &&_info) { ret = err; deleted_count = _deleted_count; if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_multi_del(hash_key, sort_keys, std::move(callback), timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_multi_del(const std::string &hash_key, const std::set &sort_keys, async_multi_del_callback_t &&callback, int timeout_milliseconds) { // check params if (hash_key.size() == 0) { derror("invalid hash key: hash key should not be empty for multi_del"); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, 0, internal_info()); return; } if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, 0, internal_info()); return; } if (sort_keys.empty()) { derror("invalid sort keys: should not be empty"); if (callback != nullptr) callback(PERR_INVALID_VALUE, 0, internal_info()); return; } ::dsn::apps::multi_remove_request req; req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size()); for (auto &sort_key : sort_keys) { req.sort_keys.emplace_back(sort_key.data(), 0, sort_key.size()); } ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } ::dsn::apps::multi_remove_response response; internal_info info; int64_t deleted_count = 0; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; deleted_count = response.count; } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, deleted_count, std::move(info)); }; _client->multi_remove(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::incr(const std::string &hash_key, const std::string &sort_key, int64_t increment, int64_t &new_value, int timeout_milliseconds, int ttl_seconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int _err, int64_t _new_value, internal_info &&_info) { ret = _err; new_value = _new_value; if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_incr( hash_key, sort_key, increment, std::move(callback), timeout_milliseconds, ttl_seconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_incr(const std::string &hash_key, const std::string &sort_key, int64_t increment, async_incr_callback_t &&callback, int timeout_milliseconds, int ttl_seconds) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, 0, internal_info()); return; } if (ttl_seconds < -1) { derror("invalid ttl seconds: should be no less than -1, but %d", ttl_seconds); if (callback != nullptr) callback(PERR_INVALID_ARGUMENT, 0, internal_info()); return; } ::dsn::apps::incr_request req; pegasus_generate_key(req.key, hash_key, sort_key); req.increment = increment; if (ttl_seconds <= 0) req.expire_ts_seconds = ttl_seconds; else req.expire_ts_seconds = ttl_seconds + utils::epoch_now(); auto partition_hash = pegasus_key_hash(req.key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } ::dsn::apps::incr_response response; internal_info info; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, response.new_value, std::move(info)); }; _client->incr(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::check_and_set(const std::string &hash_key, const std::string &check_sort_key, cas_check_type check_type, const std::string &check_operand, const std::string &set_sort_key, const std::string &set_value, const check_and_set_options &options, check_and_set_results &results, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int _err, check_and_set_results &&_results, internal_info &&_info) { ret = _err; results = std::move(_results); if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_check_and_set(hash_key, check_sort_key, check_type, check_operand, set_sort_key, set_value, options, std::move(callback), timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_check_and_set(const std::string &hash_key, const std::string &check_sort_key, cas_check_type check_type, const std::string &check_operand, const std::string &set_sort_key, const std::string &set_value, const check_and_set_options &options, async_check_and_set_callback_t &&callback, int timeout_milliseconds) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, check_and_set_results(), internal_info()); return; } if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) == dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) { derror("invalid check type: %d", (int)check_type); if (callback != nullptr) callback(PERR_INVALID_ARGUMENT, check_and_set_results(), internal_info()); return; } ::dsn::apps::check_and_set_request req; req.hash_key.assign(hash_key.c_str(), 0, hash_key.size()); req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size()); req.check_type = (dsn::apps::cas_check_type::type)check_type; req.check_operand.assign(check_operand.c_str(), 0, check_operand.size()); if (check_sort_key != set_sort_key) { req.set_diff_sort_key = true; req.set_sort_key.assign(set_sort_key.c_str(), 0, set_sort_key.size()); } req.set_value.assign(set_value.c_str(), 0, set_value.size()); if (options.set_value_ttl_seconds == 0) req.set_expire_ts_seconds = 0; else req.set_expire_ts_seconds = options.set_value_ttl_seconds + utils::epoch_now(); req.return_check_value = options.return_check_value; ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } check_and_set_results results; internal_info info; ::dsn::apps::check_and_set_response response; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); if (response.error == 0) { results.set_succeed = true; } else if (response.error == 13) { // kTryAgain results.set_succeed = false; response.error = 0; } else { results.set_succeed = false; } if (response.check_value_returned) { results.check_value_returned = true; if (response.check_value_exist) { results.check_value_exist = true; results.check_value.assign(response.check_value.data(), response.check_value.length()); } } info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(results), std::move(info)); }; _client->check_and_set(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::check_and_mutate(const std::string &hash_key, const std::string &check_sort_key, cas_check_type check_type, const std::string &check_operand, const mutations &mutations, const check_and_mutate_options &options, check_and_mutate_results &results, int timeout_milliseconds, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int _err, check_and_mutate_results &&_results, internal_info &&_info) { ret = _err; results = std::move(_results); if (info != nullptr) (*info) = std::move(_info); op_completed.notify(); }; async_check_and_mutate(hash_key, check_sort_key, check_type, check_operand, mutations, options, std::move(callback), timeout_milliseconds); op_completed.wait(); return ret; } void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key, const std::string &check_sort_key, cas_check_type check_type, const std::string &check_operand, const mutations &mutations, const check_and_mutate_options &options, async_check_and_mutate_callback_t &&callback, int timeout_milliseconds) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); if (callback != nullptr) callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info()); return; } if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) == dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) { derror("invalid check type: %d", (int)check_type); if (callback != nullptr) callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info()); return; } if (mutations.is_empty()) { derror("invalid mutations: mutations should not be empty."); if (callback != nullptr) callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info()); return; } ::dsn::apps::check_and_mutate_request req; req.hash_key.assign(hash_key.c_str(), 0, hash_key.size()); req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size()); req.check_type = (dsn::apps::cas_check_type::type)check_type; req.check_operand.assign(check_operand.c_str(), 0, check_operand.size()); std::vector mutate_list; mutations.get_mutations(mutate_list); req.mutate_list.resize(mutate_list.size()); for (int i = 0; i < mutate_list.size(); ++i) { auto &mu = mutate_list[i]; req.mutate_list[i].operation = (dsn::apps::mutate_operation::type)mu.operation; req.mutate_list[i].sort_key = blob::create_from_bytes(std::move(mu.sort_key)); if (mu.operation == mutate::mutate_operation::MO_PUT) { req.mutate_list[i].value = blob::create_from_bytes(std::move(mu.value)); req.mutate_list[i].set_expire_ts_seconds = mu.set_expire_ts_seconds; } } req.return_check_value = options.return_check_value; ::dsn::blob tmp_key; pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); auto partition_hash = pegasus_key_hash(tmp_key); auto new_callback = [user_callback = std::move(callback)]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { if (user_callback == nullptr) { return; } check_and_mutate_results results; internal_info info; ::dsn::apps::check_and_mutate_response response; if (err == ::dsn::ERR_OK) { ::dsn::unmarshall(resp, response); if (response.error == 0) { results.mutate_succeed = true; } else if (response.error == 13) { // kTryAgain results.mutate_succeed = false; response.error = 0; } else { results.mutate_succeed = false; } if (response.check_value_returned) { results.check_value_returned = true; if (response.check_value_exist) { results.check_value_exist = true; results.check_value.assign(response.check_value.data(), response.check_value.length()); } } info.app_id = response.app_id; info.partition_index = response.partition_index; info.decree = response.decree; info.server = response.server; } int ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); user_callback(ret, std::move(results), std::move(info)); }; _client->check_and_mutate(req, std::move(new_callback), std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); } int pegasus_client_impl::ttl(const std::string &hash_key, const std::string &sort_key, int &ttl_seconds, int timeout_milliseconds, internal_info *info) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); return PERR_INVALID_HASH_KEY; } ::dsn::blob req; pegasus_generate_key(req, hash_key, sort_key); auto partition_hash = pegasus_key_hash(req); auto pr = _client->ttl_sync(req, std::chrono::milliseconds(timeout_milliseconds), 0, partition_hash); if (pr.first == ERR_OK && pr.second.error == 0) { ttl_seconds = pr.second.ttl_seconds; } if (info != nullptr) { if (pr.first == ERR_OK) { info->app_id = pr.second.app_id; info->partition_index = pr.second.partition_index; info->decree = -1; info->server = pr.second.server; } else { info->app_id = -1; info->partition_index = -1; info->decree = -1; } } return get_client_error(pr.first == ERR_OK ? get_rocksdb_server_error(pr.second.error) : int(pr.first)); } void pegasus_client_impl::async_get_scanner(const std::string &hash_key, const std::string &start_sortkey, const std::string &stop_sortkey, const scan_options &options, async_get_scanner_callback_t &&callback) { if (callback) { pegasus_scanner *scanner; int ret = get_scanner(hash_key, start_sortkey, stop_sortkey, options, scanner); callback(ret, scanner); } } int pegasus_client_impl::get_scanner(const std::string &hash_key, const std::string &start_sort_key, const std::string &stop_sort_key, const scan_options &options, pegasus_scanner *&scanner) { // check params if (hash_key.size() >= UINT16_MAX) { derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", (int)hash_key.size()); return PERR_INVALID_HASH_KEY; } if (hash_key.empty()) { derror("invalid hash key: hash key cannot be empty when scan"); return PERR_INVALID_HASH_KEY; } ::dsn::blob start; ::dsn::blob stop; scan_options o(options); // generate key range by start_sort_key and stop_sort_key pegasus_generate_key(start, hash_key, start_sort_key); if (stop_sort_key.empty()) { pegasus_generate_next_blob(stop, hash_key); o.stop_inclusive = false; } else { pegasus_generate_key(stop, hash_key, stop_sort_key); } // limit key range by prefix filter if (o.sort_key_filter_type == filter_type::FT_MATCH_PREFIX && o.sort_key_filter_pattern.length() > 0) { ::dsn::blob prefix_start, prefix_stop; pegasus_generate_key(prefix_start, hash_key, o.sort_key_filter_pattern); pegasus_generate_next_blob(prefix_stop, hash_key, o.sort_key_filter_pattern); if (::dsn::string_view(prefix_start).compare(start) > 0) { start = std::move(prefix_start); o.start_inclusive = true; } if (::dsn::string_view(prefix_stop).compare(stop) <= 0) { stop = std::move(prefix_stop); o.stop_inclusive = false; } } // check if range is empty std::vector v; int c = ::dsn::string_view(start).compare(stop); if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) { v.push_back(pegasus_key_hash(start)); } scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop); return PERR_OK; } DEFINE_TASK_CODE_RPC(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT) void pegasus_client_impl::async_get_unordered_scanners( int max_split_count, const scan_options &options, async_get_unordered_scanners_callback_t &&callback) { if (!callback) { return; } // check params if (max_split_count <= 0) { derror("invalid max_split_count: which should be greater than 0, but %d", max_split_count); callback(PERR_INVALID_SPLIT_COUNT, std::vector()); return; } auto new_callback = [ user_callback = std::move(callback), max_split_count, options, this ]( ::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) { std::vector scanners; configuration_query_by_index_response response; if (err == ERR_OK) { ::dsn::unmarshall(resp, response); if (response.err == ERR_OK) { unsigned int count = response.partition_count; int split = count < max_split_count ? count : max_split_count; scanners.resize(split); int size = count / split; int more = count - size * split; for (int i = 0; i < split; i++) { int s = size + (i < more); std::vector hash(s); for (int j = 0; j < s; j++) hash[j] = --count; scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options); } } } int ret = get_client_error(err == ERR_OK ? int(response.err) : int(err)); user_callback(ret, std::move(scanners)); }; configuration_query_by_index_request req; req.app_name = _app_name; ::dsn::rpc::call(_meta_server, RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, req, nullptr, new_callback, std::chrono::milliseconds(options.timeout_ms), 0, 0); } int pegasus_client_impl::get_unordered_scanners(int max_split_count, const scan_options &options, std::vector &scanners) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::vector &&ss) { ret = err; scanners = std::move(ss); op_completed.notify(); }; async_get_unordered_scanners(max_split_count, options, std::move(callback)); op_completed.wait(); return ret; } const char *pegasus_client_impl::get_error_string(int error_code) const { auto it = _client_error_to_string.find(error_code); dassert( it != _client_error_to_string.end(), "client error %d have no error string", error_code); return it->second.c_str(); } /*static*/ void pegasus_client_impl::init_error() { _client_error_to_string.clear(); #define PEGASUS_ERR_CODE(x, y, z) _client_error_to_string[y] = z #include #undef PEGASUS_ERR_CODE _server_error_to_client.clear(); _server_error_to_client[::dsn::ERR_OK] = PERR_OK; _server_error_to_client[::dsn::ERR_TIMEOUT] = PERR_TIMEOUT; _server_error_to_client[::dsn::ERR_FILE_OPERATION_FAILED] = PERR_SERVER_INTERNAL_ERROR; _server_error_to_client[::dsn::ERR_INVALID_STATE] = PERR_SERVER_CHANGED; _server_error_to_client[::dsn::ERR_OBJECT_NOT_FOUND] = PERR_OBJECT_NOT_FOUND; _server_error_to_client[::dsn::ERR_NETWORK_FAILURE] = PERR_NETWORK_FAILURE; _server_error_to_client[::dsn::ERR_HANDLER_NOT_FOUND] = PERR_HANDLER_NOT_FOUND; _server_error_to_client[::dsn::ERR_OPERATION_DISABLED] = PERR_OPERATION_DISABLED; _server_error_to_client[::dsn::ERR_NOT_ENOUGH_MEMBER] = PERR_NOT_ENOUGH_MEMBER; _server_error_to_client[::dsn::ERR_APP_NOT_EXIST] = PERR_APP_NOT_EXIST; _server_error_to_client[::dsn::ERR_APP_EXIST] = PERR_APP_EXIST; _server_error_to_client[::dsn::ERR_BUSY] = PERR_APP_BUSY; // rocksdb error; for (int i = 1001; i < 1013; i++) { _server_error_to_client[-i] = -i; } } /*static*/ int pegasus_client_impl::get_client_error(int server_error) { auto it = _server_error_to_client.find(server_error); if (it != _server_error_to_client.end()) return it->second; derror("can't find corresponding client error definition, server error:[%d:%s]", server_error, ::dsn::error_code(server_error).to_string()); return PERR_UNKNOWN; } /*static*/ int pegasus_client_impl::get_rocksdb_server_error(int rocskdb_error) { return (rocskdb_error == 0) ? 0 : ROCSKDB_ERROR_START - rocskdb_error; } } // namespace client } // namespace pegasus