提交 b0bd9f52 编写于 作者: Y Yingchun Lai 提交者: Wu Tao

GEO: add GEOADD API for redis proxy to support full GEO function (#381)

上级 72082493
......@@ -65,14 +65,13 @@ int main(int argc, char **argv)
// generate data for test
if (gen_data) {
const pegasus::geo::latlng_codec &codec = my_geo.get_codec();
for (int i = 0; i < data_count; ++i) {
std::string value;
S2LatLng latlng(S2Testing::SamplePoint(rect));
std::string id = std::to_string(i);
std::string value = id + "|2018-06-05 12:00:00|2018-06-05 13:00:00|abcdefg|" +
std::to_string(latlng.lng().degrees()) + "|" +
std::to_string(latlng.lat().degrees()) + "|123.456|456.789|0|-1";
int ret = my_geo.set(id, "", value, 1000);
bool ok = codec.encode_to_value(latlng.lat().degrees(), latlng.lng().degrees(), value);
assert(ok);
int ret = my_geo.set(std::to_string(i), "", value, 1000);
if (ret != pegasus::PERR_OK) {
std::cerr << "set data failed. error=" << ret << std::endl;
}
......
......@@ -63,7 +63,7 @@ geo_client::geo_client(const char *config_file,
uint32_t longitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "longitude_index", 4, "longitude index in value");
dsn::error_s s = _extractor.set_latlng_indices(latitude_index, longitude_index);
dsn::error_s s = _codec.set_latlng_indices(latitude_index, longitude_index);
dassert_f(s.is_ok(), "set_latlng_indices({}, {}) failed", latitude_index, longitude_index);
}
......@@ -161,6 +161,23 @@ void geo_client::async_set(const std::string &hash_key,
timeout_ms);
}
void geo_client::async_set(const std::string &hash_key,
const std::string &sort_key,
double lat_degrees,
double lng_degrees,
pegasus_client::async_set_callback_t &&callback,
int timeout_ms,
int ttl_seconds)
{
std::string value;
if (!_codec.encode_to_value(lat_degrees, lng_degrees, value)) {
callback(PERR_GEO_INVALID_LATLNG_ERROR, {});
return;
}
async_set(hash_key, sort_key, value, std::move(callback), timeout_ms, ttl_seconds);
}
int geo_client::get(const std::string &hash_key,
const std::string &sort_key,
double &lat_degrees,
......@@ -204,8 +221,8 @@ void geo_client::async_get(const std::string &hash_key,
return;
}
S2LatLng latlng;
if (!_extractor.extract_from_value(value_, latlng)) {
derror_f("extract_from_value failed. hash_key={}, sort_key={}, value={}",
if (!_codec.decode_from_value(value_, latlng)) {
derror_f("decode_from_value failed. hash_key={}, sort_key={}, value={}",
hash_key,
sort_key,
value_);
......@@ -466,8 +483,8 @@ void geo_client::async_search_radial(const std::string &hash_key,
}
S2LatLng latlng;
if (!_extractor.extract_from_value(value_, latlng)) {
derror_f("extract_from_value failed. hash_key={}, sort_key={}, value={}",
if (!_codec.decode_from_value(value_, latlng)) {
derror_f("decode_from_value failed. hash_key={}, sort_key={}, value={}",
hash_key,
sort_key,
value_);
......@@ -658,8 +675,8 @@ bool geo_client::generate_geo_keys(const std::string &hash_key,
{
// extract latitude and longitude from value
S2LatLng latlng;
if (!_extractor.extract_from_value(value, latlng)) {
derror_f("extract_from_value failed. hash_key={}, sort_key={}, value={}",
if (!_codec.decode_from_value(value, latlng)) {
derror_f("decode_from_value failed. hash_key={}, sort_key={}, value={}",
hash_key,
sort_key,
value);
......@@ -841,8 +858,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
}
S2LatLng latlng;
if (!_extractor.extract_from_value(value, latlng)) {
derror_f("extract_from_value failed. value={}", value);
if (!_codec.decode_from_value(value, latlng)) {
derror_f("decode_from_value failed. value={}", value);
cb();
return;
}
......@@ -928,8 +945,8 @@ void geo_client::async_distance(const std::string &hash_key1,
}
S2LatLng latlng;
if (!_extractor.extract_from_value(value_, latlng)) {
derror_f("extract_from_value failed. value={}", value_);
if (!_codec.decode_from_value(value_, latlng)) {
derror_f("decode_from_value failed. value={}", value_);
*ret = PERR_GEO_DECODE_VALUE_ERROR;
}
......
......@@ -10,7 +10,7 @@
#include <s2/util/units/length-units.h>
#include <dsn/tool-api/task_tracker.h>
#include <pegasus/client.h>
#include "latlng_extractor.h"
#include "latlng_codec.h"
namespace dsn {
class error_s;
......@@ -29,7 +29,7 @@ using get_latlng_callback_t =
/// the search result structure used by `search_radial` APIs
struct SearchResult
{
double lat_degrees; // latitude and longitude extract by `latlng_extractor`, in degree
double lat_degrees; // latitude and longitude extract by `latlng_codec`, in degree
double lng_degrees;
double distance; // distance from the input and the result, in meter
std::string hash_key; // the original hash_key, sort_key, and value when data inserted
......@@ -107,7 +107,7 @@ public:
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
/// REQUIRES: latitude and longitude can be correctly extracted from `value` by latlng_extractor
/// REQUIRES: latitude and longitude can be correctly extracted from `value` by latlng_codec
int set(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
......@@ -122,6 +122,14 @@ public:
int timeout_ms = 5000,
int ttl_seconds = 0);
void async_set(const std::string &hash_key,
const std::string &sort_key,
double lat_degrees,
double lng_degrees,
pegasus_client::async_set_callback_t &&callback = nullptr,
int timeout_ms = 5000,
int ttl_seconds = 0);
///
/// \brief get
/// get latitude and longitude of key pair from the cluster
......@@ -141,7 +149,7 @@ public:
/// this error can be converted to a string using get_error_string()
///
/// REQUIRES: value of this key can be correctly extracted to latitude and longitude by
/// latlng_extractor
/// latlng_codec
int get(const std::string &hash_key,
const std::string &sort_key,
double &lat_degrees,
......@@ -164,7 +172,7 @@ public:
/// if wait longer than this value, will return timeout error
///
/// REQUIRES: value of this key can be correctly extracted to latitude and longitude by
/// latlng_extractor
/// latlng_codec
void async_get(const std::string &hash_key,
const std::string &sort_key,
int id,
......@@ -216,7 +224,7 @@ public:
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
/// REQUIRES: latitude and longitude can be correctly extracted from `value` by latlng_extractor
/// REQUIRES: latitude and longitude can be correctly extracted from `value` by latlng_codec
int set_geo_data(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
......@@ -290,7 +298,7 @@ public:
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
/// REQUIRES: latitude and longitude can be correctly extracted by latlng_extractor from the
/// REQUIRES: latitude and longitude can be correctly extracted by latlng_codec from the
/// value corresponding to `hash_key` and `sort_key`
int search_radial(const std::string &hash_key,
const std::string &sort_key,
......@@ -341,6 +349,9 @@ public:
dsn::error_s set_max_level(int level);
// For test.
const latlng_codec &get_codec() const { return _codec; }
private:
friend class geo_client_test;
......@@ -358,7 +369,7 @@ private:
// generate hash_key and sort_key in geo database from hash_key and sort_key in common data
// database
// geo hash_key is the prefix of cell id which is calculated from value by `_extractor`, its
// geo hash_key is the prefix of cell id which is calculated from value by `_codec`, its
// length is associated with `_min_level`
// geo sort_key is composed with the postfix of the same cell id and origin hash_key and
// sort_key
......@@ -459,7 +470,7 @@ private:
dsn::task_tracker _tracker;
latlng_extractor _extractor;
latlng_codec _codec;
pegasus_client *_common_data_client = nullptr;
pegasus_client *_geo_data_client = nullptr;
};
......
......@@ -2,8 +2,10 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "latlng_extractor.h"
#include "latlng_codec.h"
#include <dsn/service_api_cpp.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/error_code.h>
#include <dsn/utility/errors.h>
#include <dsn/utility/string_conv.h>
......@@ -39,16 +41,17 @@ void extract_indices(const std::string &line,
}
}
bool latlng_extractor::extract_from_value(const std::string &value, S2LatLng &latlng)
bool latlng_codec::decode_from_value(const std::string &value, S2LatLng &latlng) const
{
assert(_sorted_indices.size() == 2);
std::vector<std::string> data;
extract_indices(value, _sorted_indices, data, '|');
if (data.size() != 2) {
return false;
}
std::string &lat = data[_latlng_reversed ? 1 : 0];
std::string &lng = data[_latlng_reversed ? 0 : 1];
std::string &lat = data[_latlng_order ? 0 : 1];
std::string &lng = data[_latlng_order ? 1 : 0];
double lat_degrees = 0.0;
double lng_degrees = 0.0;
if (!dsn::buf2double(lat, lat_degrees) || !dsn::buf2double(lng, lng_degrees)) {
......@@ -59,17 +62,44 @@ bool latlng_extractor::extract_from_value(const std::string &value, S2LatLng &la
return latlng.is_valid();
}
dsn::error_s latlng_extractor::set_latlng_indices(uint32_t latitude_index, uint32_t longitude_index)
bool latlng_codec::encode_to_value(double lat_degrees, double lng_degrees, std::string &value) const
{
assert(_sorted_indices.size() == 2);
S2LatLng latlng = S2LatLng::FromDegrees(lat_degrees, lng_degrees);
if (!latlng.is_valid()) {
derror_f("latlng is invalid. lat_degrees={}, lng_degrees={}", lat_degrees, lng_degrees);
return false;
}
value.clear();
int index = 0;
for (int i = 0; i <= _sorted_indices[1]; ++i) {
if (i == _sorted_indices[index]) {
if ((index == 0 && _latlng_order) || (index == 1 && !_latlng_order)) {
value += std::to_string(latlng.lat().degrees());
} else {
value += std::to_string(latlng.lng().degrees());
}
++index;
}
if (i != _sorted_indices[1]) {
value += '|';
}
}
return true;
}
dsn::error_s latlng_codec::set_latlng_indices(uint32_t latitude_index, uint32_t longitude_index)
{
if (latitude_index == longitude_index) {
return dsn::error_s::make(dsn::ERR_INVALID_PARAMETERS,
"latitude_index and longitude_index should not be equal");
} else if (latitude_index < longitude_index) {
_sorted_indices = {(int)latitude_index, (int)longitude_index};
_latlng_reversed = false;
_latlng_order = true;
} else {
_sorted_indices = {(int)longitude_index, (int)latitude_index};
_latlng_reversed = true;
_latlng_order = false;
}
return dsn::error_s::ok();
}
......
......@@ -16,12 +16,16 @@ class error_s;
namespace pegasus {
namespace geo {
class latlng_extractor
class latlng_codec
{
public:
// Extract latitude and longitude from value.
// Decode latitude and longitude from string type value.
// Return true when succeed.
bool extract_from_value(const std::string &value, S2LatLng &latlng);
bool decode_from_value(const std::string &value, S2LatLng &latlng) const;
// Encode latitude and longitude into string type value.
// Return true when succeed.
bool encode_to_value(double lat_degrees, double lng_degrees, std::string &value) const;
// Set latitude and longitude indices in string type value, indices are the ones
// when the string type value split into list by '|'.
......@@ -31,7 +35,7 @@ private:
// Latitude index and longitude index in sorted order.
std::vector<int> _sorted_indices;
// Whether '_sorted_indices' is in latitude-longitude order.
bool _latlng_reversed = false;
bool _latlng_order = true;
};
} // namespace geo
......
......@@ -4,24 +4,24 @@
#include <memory>
#include <gtest/gtest.h>
#include <geo/lib/latlng_extractor.h>
#include <geo/lib/latlng_codec.h>
#include <dsn/utility/errors.h>
namespace pegasus {
namespace geo {
TEST(latlng_extractor_test, set_latlng_indices)
TEST(latlng_codec_test, set_latlng_indices)
{
latlng_extractor extractor;
ASSERT_FALSE(extractor.set_latlng_indices(3, 3).is_ok());
ASSERT_TRUE(extractor.set_latlng_indices(3, 4).is_ok());
ASSERT_TRUE(extractor.set_latlng_indices(4, 3).is_ok());
latlng_codec codec;
ASSERT_FALSE(codec.set_latlng_indices(3, 3).is_ok());
ASSERT_TRUE(codec.set_latlng_indices(3, 4).is_ok());
ASSERT_TRUE(codec.set_latlng_indices(4, 3).is_ok());
}
TEST(latlng_extractor_for_lbs_test, extract_from_value)
TEST(latlng_codec_for_lbs_test, decode_from_value)
{
latlng_extractor extractor;
ASSERT_TRUE(extractor.set_latlng_indices(5, 4).is_ok());
latlng_codec codec;
ASSERT_TRUE(codec.set_latlng_indices(5, 4).is_ok());
double lat_degrees = 12.345;
double lng_degrees = 67.890;
......@@ -30,83 +30,120 @@ TEST(latlng_extractor_for_lbs_test, extract_from_value)
std::string test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) +
"|24.043028|4.15921|0|-1";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "|2018-04-26|2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + "|" +
std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e||2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + "|" +
std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) + "||4.15921|0|-1";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) +
"|24.043028|4.15921|0|";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr||" +
std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "||24.043028|4.15921|0|-1";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|||24.043028|4.15921|0|-1";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
}
TEST(latlng_extractor_for_aibox_test, extract_from_value)
TEST(latlng_codec_for_aibox_test, decode_from_value)
{
latlng_extractor extractor;
ASSERT_TRUE(extractor.set_latlng_indices(0, 1).is_ok());
latlng_codec codec;
ASSERT_TRUE(codec.set_latlng_indices(0, 1).is_ok());
double lat_degrees = 12.345;
double lng_degrees = 67.890;
S2LatLng latlng;
std::string test_value = std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees) + "|24.043028";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees) + "||";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
ASSERT_TRUE(codec.decode_from_value(test_value, latlng));
ASSERT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
ASSERT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "|" + std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees);
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
test_value = "|" + std::to_string(lat_degrees);
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
test_value = std::to_string(lng_degrees) + "|";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
test_value = "|";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
ASSERT_FALSE(codec.decode_from_value(test_value, latlng));
}
TEST(latlng_codec_encode_test, encode_to_value)
{
latlng_codec codec;
double lat_degrees = 12.345;
double lng_degrees = 67.890;
std::string value;
ASSERT_TRUE(codec.set_latlng_indices(0, 1).is_ok());
ASSERT_TRUE(codec.encode_to_value(lat_degrees, lng_degrees, value));
ASSERT_EQ("12.345000|67.890000", value);
ASSERT_TRUE(codec.set_latlng_indices(1, 0).is_ok());
ASSERT_TRUE(codec.encode_to_value(lat_degrees, lng_degrees, value));
ASSERT_EQ("67.890000|12.345000", value);
ASSERT_TRUE(codec.set_latlng_indices(4, 5).is_ok());
ASSERT_TRUE(codec.encode_to_value(lat_degrees, lng_degrees, value));
ASSERT_EQ("||||12.345000|67.890000", value);
ASSERT_TRUE(codec.set_latlng_indices(5, 4).is_ok());
ASSERT_TRUE(codec.encode_to_value(lat_degrees, lng_degrees, value));
ASSERT_EQ("||||67.890000|12.345000", value);
ASSERT_TRUE(codec.set_latlng_indices(0, 3).is_ok());
ASSERT_TRUE(codec.encode_to_value(lat_degrees, lng_degrees, value));
ASSERT_EQ("12.345000|||67.890000", value);
ASSERT_TRUE(codec.set_latlng_indices(3, 1).is_ok());
ASSERT_TRUE(codec.encode_to_value(lat_degrees, lng_degrees, value));
ASSERT_EQ("|67.890000||12.345000", value);
ASSERT_FALSE(codec.encode_to_value(91, lng_degrees, value));
ASSERT_FALSE(codec.encode_to_value(-91, lng_degrees, value));
ASSERT_FALSE(codec.encode_to_value(lat_degrees, 181, value));
ASSERT_FALSE(codec.encode_to_value(lat_degrees, -181, value));
}
} // namespace geo
} // namespace pegasus
......@@ -29,6 +29,7 @@ std::unordered_map<std::string, redis_parser::redis_call_handler> redis_parser::
{"SETEX", redis_parser::g_setex},
{"TTL", redis_parser::g_ttl},
{"PTTL", redis_parser::g_ttl},
{"GEOADD", redis_parser::g_geo_add},
{"GEODIST", redis_parser::g_geo_dist},
{"GEOPOS", redis_parser::g_geo_pos},
{"GEORADIUS", redis_parser::g_geo_radius},
......@@ -1140,6 +1141,62 @@ void redis_parser::process_geo_radius_result(message_entry &entry,
}
}
// command format:
// GEOADD key longitude latitude member [longitude latitude member ...]
void redis_parser::geo_add(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 5 || (redis_request.sub_requests.size() - 2) % 3 != 0) {
simple_error_reply(entry, "wrong number of arguments for 'geoadd' command");
return;
}
int member_count = (redis_request.sub_requests.size() - 2) / 3;
std::shared_ptr<proxy_session> ref_this = shared_from_this();
std::shared_ptr<std::atomic<int32_t>> set_count =
std::make_shared<std::atomic<int32_t>>(member_count);
std::shared_ptr<redis_integer> result(new redis_integer());
auto set_latlng_callback = [ref_this, this, &entry, result, set_count](
int error_code, pegasus_client::internal_info &&info) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: GEOADD command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != error_code) {
if (set_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
return;
}
++(result->value);
if (set_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
};
for (int i = 0; i < member_count; ++i) {
dsn::string_view lng_degree_str(redis_request.sub_requests[2 + i * 3].data);
dsn::string_view lat_degree_str(redis_request.sub_requests[2 + i * 3 + 1].data);
double lng_degree;
double lat_degree;
if (dsn::buf2double(lng_degree_str, lng_degree) &&
dsn::buf2double(lat_degree_str, lat_degree)) {
const std::string &hashkey = redis_request.sub_requests[2 + i * 3 + 2].data.to_string();
_geo_client->async_set(hashkey, "", lat_degree, lng_degree, set_latlng_callback, 2000);
} else if (set_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
}
}
// command format:
// GEODIST key member1 member2 [unit]
void redis_parser::geo_dist(message_entry &entry)
......
......@@ -158,6 +158,7 @@ protected:
DECLARE_REDIS_HANDLER(del)
DECLARE_REDIS_HANDLER(setex)
DECLARE_REDIS_HANDLER(ttl)
DECLARE_REDIS_HANDLER(geo_add)
DECLARE_REDIS_HANDLER(geo_dist)
DECLARE_REDIS_HANDLER(geo_pos)
DECLARE_REDIS_HANDLER(geo_radius)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册