提交 c3fc992e 编写于 作者: Y Yingchun Lai 提交者: Wu Tao

GEO: Support more configuration for geo_client (#357)

上级 bfc34214
......@@ -11,6 +11,7 @@
#include <monitoring/histogram.h>
#include <rocksdb/env.h>
#include <dsn/utility/errors.h>
#include <dsn/utility/strings.h>
#include <dsn/utility/string_conv.h>
......@@ -43,12 +44,13 @@ int main(int argc, char **argv)
std::cerr << "max_level is invalid: " << argv[6] << std::endl;
return -1;
}
pegasus::geo::geo_client my_geo("config.ini",
cluster_name.c_str(),
app_name.c_str(),
geo_app_name.c_str(),
new pegasus::geo::latlng_extractor_for_lbs());
my_geo.set_max_level(max_level);
pegasus::geo::geo_client my_geo(
"config.ini", cluster_name.c_str(), app_name.c_str(), geo_app_name.c_str());
if (!my_geo.set_max_level(max_level).is_ok()) {
std::cerr << "set_max_level failed" << std::endl;
return -1;
}
// cover beijing 5th ring road
S2LatLngRect rect(S2LatLng::FromDegrees(39.810151, 116.194511),
......
......@@ -61,10 +61,12 @@ rpc_call_header_format = NET_HDR_DSN
rpc_call_channel = RPC_CHANNEL_TCP
rpc_timeout_milliseconds = 5000
[uri-resolver.dsn://onebox]
factory = partition_resolver_simple
arguments = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
[pegasus.clusters]
onebox = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
[geo_client.lib]
;NOTE: 'min_level' is immutable after some data has been inserted into DB by geo_client.
min_level = 12
max_level = 16
latitude_index = 5
longitude_index = 4
......@@ -9,6 +9,7 @@
#include <s2/s2cap.h>
#include <dsn/service_api_cpp.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/errors.h>
#include <base/pegasus_key_schema.h>
#include <base/pegasus_utils.h>
......@@ -34,8 +35,7 @@ struct SearchResultFarther
geo_client::geo_client(const char *config_file,
const char *cluster_name,
const char *common_app_name,
const char *geo_app_name,
latlng_extractor *extractor)
const char *geo_app_name)
{
bool ok = pegasus_client_factory::initialize(config_file);
dassert(ok, "init pegasus client factory failed");
......@@ -46,11 +46,38 @@ geo_client::geo_client(const char *config_file,
_geo_data_client = pegasus_client_factory::get_client(cluster_name, geo_app_name);
dassert(_geo_data_client != nullptr, "init pegasus _geo_data_client failed");
_extractor.reset(extractor);
_min_level = (int32_t)dsn_config_get_value_uint64(
"geo_client.lib", "min_level", 12, "min cell level for scan");
// default: 16. edge length at level 16 is about 150m
_max_level = (int32_t)dsn_config_get_value_uint64(
"geo_client.lib", "max_level", 16, "max cell level for scan");
dassert_f(_min_level < _max_level,
"_min_level({}) must be less than _max_level({})",
_min_level,
_max_level);
uint32_t latitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "latitude_index", 5, "latitude index in value");
uint32_t longitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "longitude_index", 4, "longitude index in value");
dsn::error_s s = _extractor.set_latlng_indices(latitude_index, longitude_index);
dassert_f(s.is_ok(), "set_latlng_indices({}, {}) failed", latitude_index, longitude_index);
}
dsn::error_s geo_client::set_max_level(int level)
{
if (level <= _min_level) {
return dsn::FMT_ERR(dsn::ERR_INVALID_PARAMETERS,
"level({}) must be larger than _min_level({})",
level,
_min_level);
}
_max_level = level;
return dsn::error_s::ok();
}
int geo_client::set(const std::string &hash_key,
......@@ -383,7 +410,7 @@ void geo_client::async_search_radial(const std::string &hash_key,
}
S2LatLng latlng;
if (!_extractor->extract_from_value(value_, latlng)) {
if (!_extractor.extract_from_value(value_, latlng)) {
derror_f("extract_from_value failed. hash_key={}, sort_key={}, value={}",
hash_key,
sort_key,
......@@ -575,7 +602,7 @@ bool geo_client::generate_geo_keys(const std::string &hash_key,
{
// extract latitude and longitude from value
S2LatLng latlng;
if (!_extractor->extract_from_value(value, latlng)) {
if (!_extractor.extract_from_value(value, latlng)) {
derror_f("extract_from_value failed. hash_key={}, sort_key={}, value={}",
hash_key,
sort_key,
......@@ -602,7 +629,7 @@ bool geo_client::restore_origin_keys(const std::string &geo_sort_key,
std::string &origin_sort_key)
{
// geo_sort_key: [0,3]{30-_min_level}:combine_keys
int cid_prefix_len = 30 - _min_level + 1;
int cid_prefix_len = 30 - _min_level + 1; // '1' is for ':' in geo_sort_key
if (geo_sort_key.length() <= cid_prefix_len) {
return false;
}
......@@ -758,7 +785,7 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
}
S2LatLng latlng;
if (!_extractor->extract_from_value(value, latlng)) {
if (!_extractor.extract_from_value(value, latlng)) {
derror_f("extract_from_value failed. value={}", value);
cb();
return;
......@@ -845,7 +872,7 @@ void geo_client::async_distance(const std::string &hash_key1,
}
S2LatLng latlng;
if (!_extractor->extract_from_value(value_, latlng)) {
if (!_extractor.extract_from_value(value_, latlng)) {
derror_f("extract_from_value failed. value={}", value_);
*ret = PERR_GEO_DECODE_VALUE_ERROR;
}
......
......@@ -12,6 +12,10 @@
#include <pegasus/client.h>
#include "latlng_extractor.h"
namespace dsn {
class error_s;
} // namespace dsn
namespace pegasus {
namespace geo {
......@@ -79,8 +83,7 @@ public:
geo_client(const char *config_file,
const char *cluster_name,
const char *common_app_name,
const char *geo_app_name,
latlng_extractor *extractor);
const char *geo_app_name);
~geo_client() { _tracker.wait_outstanding_tasks(); }
......@@ -285,7 +288,7 @@ public:
return _common_data_client->get_error_string(error_code);
}
void set_max_level(int level) { _max_level = level; }
dsn::error_s set_max_level(int level);
private:
friend class geo_client_test;
......@@ -394,18 +397,18 @@ private:
private:
// cell id at this level is the hash-key in pegasus
// `_min_level` is immutable after geo_client data has been inserted into DB.
const int _min_level = 12; // edge length at level 12 is about 2km
int _min_level = 12; // edge length at level 12 is about 2km
// cell id at this level is the prefix of sort-key in pegasus, and
// it's convenient for scan operation
// `_max_level` is mutable at any time, and geo_client-lib users can change it to a appropriate
// value
// to improve performance in their scenario.
int _max_level = 16;
int _max_level = 16; // edge length at level 16 is about 150m
dsn::task_tracker _tracker;
std::shared_ptr<const latlng_extractor> _extractor = nullptr;
latlng_extractor _extractor;
pegasus_client *_common_data_client = nullptr;
pegasus_client *_geo_data_client = nullptr;
};
......
......@@ -2,25 +2,28 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <dsn/utility/string_conv.h>
#include "latlng_extractor.h"
#include <dsn/utility/error_code.h>
#include <dsn/utility/errors.h>
#include <dsn/utility/string_conv.h>
namespace pegasus {
namespace geo {
void extract_indexs(const std::string &text,
const std::vector<int> &indexs,
std::vector<std::string> &values,
char splitter)
void extract_indices(const std::string &line,
const std::vector<int> &sorted_indices,
std::vector<std::string> &values,
char splitter)
{
size_t begin_pos = 0;
size_t end_pos = 0;
int cur_index = -1;
for (auto index : indexs) {
for (auto index : sorted_indices) {
while (cur_index < index) {
begin_pos = (cur_index == -1 ? 0 : end_pos + 1); // at first time, seek from 0
// then, seek from end_pos + 1
end_pos = text.find(splitter, begin_pos);
end_pos = line.find(splitter, begin_pos);
if (end_pos == std::string::npos) {
break;
}
......@@ -28,32 +31,26 @@ void extract_indexs(const std::string &text,
}
if (end_pos == std::string::npos) {
values.emplace_back(text.substr(begin_pos));
values.emplace_back(line.substr(begin_pos));
break;
} else {
values.emplace_back(text.substr(begin_pos, end_pos - begin_pos));
values.emplace_back(line.substr(begin_pos, end_pos - begin_pos));
}
}
}
const char *latlng_extractor_for_lbs::name() const { return "latlng_extractor_for_lbs"; }
const char *latlng_extractor_for_lbs::value_sample() const
{
return "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|160.356396|39.469644|24.0|4.15|0|-1";
}
bool latlng_extractor_for_lbs::extract_from_value(const std::string &value, S2LatLng &latlng) const
bool latlng_extractor::extract_from_value(const std::string &value, S2LatLng &latlng)
{
std::vector<std::string> data;
extract_indexs(value, {4, 5}, data, '|');
extract_indices(value, _sorted_indices, data, '|');
if (data.size() != 2) {
return false;
}
std::string lng = data[0];
std::string lat = data[1];
double lat_degrees, lng_degrees = 0.0;
std::string &lat = data[_latlng_reversed ? 1 : 0];
std::string &lng = data[_latlng_reversed ? 0 : 1];
double lat_degrees = 0.0;
double lng_degrees = 0.0;
if (!dsn::buf2double(lat, lat_degrees) || !dsn::buf2double(lng, lng_degrees)) {
return false;
}
......@@ -62,5 +59,20 @@ bool latlng_extractor_for_lbs::extract_from_value(const std::string &value, S2La
return latlng.is_valid();
}
dsn::error_s latlng_extractor::set_latlng_indices(uint32_t latitude_index, uint32_t longitude_index)
{
if (latitude_index == longitude_index) {
return dsn::error_s::make(dsn::ERR_INVALID_PARAMETERS,
"latitude_index and longitude_index should not be equal");
} else if (latitude_index < longitude_index) {
_sorted_indices = {(int)latitude_index, (int)longitude_index};
_latlng_reversed = false;
} else {
_sorted_indices = {(int)longitude_index, (int)latitude_index};
_latlng_reversed = true;
}
return dsn::error_s::ok();
}
} // namespace geo
} // namespace pegasus
......@@ -9,24 +9,29 @@
#include <s2/s2latlng.h>
#include <dsn/utility/strings.h>
namespace dsn {
class error_s;
} // namespace dsn
namespace pegasus {
namespace geo {
class latlng_extractor
{
public:
virtual ~latlng_extractor() = default;
virtual const char *name() const = 0;
virtual const char *value_sample() const = 0;
virtual bool extract_from_value(const std::string &value, S2LatLng &latlng) const = 0;
};
// Extract latitude and longitude from value.
// Return true when succeed.
bool extract_from_value(const std::string &value, S2LatLng &latlng);
class latlng_extractor_for_lbs : public latlng_extractor
{
public:
const char *name() const final;
const char *value_sample() const final;
bool extract_from_value(const std::string &value, S2LatLng &latlng) const final;
// Set latitude and longitude indices in string type value, indices are the ones
// when the string type value split into list by '|'.
dsn::error_s set_latlng_indices(uint32_t latitude_index, uint32_t longitude_index);
private:
// Latitude index and longitude index in sorted order.
std::vector<int> _sorted_indices;
// Whether '_sorted_indices' is in latitude-longitude order.
bool _latlng_reversed = false;
};
} // namespace geo
......
......@@ -61,6 +61,5 @@ rpc_call_header_format = NET_HDR_DSN
rpc_call_channel = RPC_CHANNEL_TCP
rpc_timeout_milliseconds = 5000
[uri-resolver.dsn://onebox]
factory = partition_resolver_simple
arguments = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
[pegasus.clusters]
onebox = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
......@@ -5,63 +5,107 @@
#include <memory>
#include <gtest/gtest.h>
#include <geo/lib/latlng_extractor.h>
#include <dsn/utility/errors.h>
namespace pegasus {
namespace geo {
static std::shared_ptr<latlng_extractor_for_lbs> lbs_extractor =
std::make_shared<latlng_extractor_for_lbs>();
TEST(latlng_extractor_for_lbs_test, extract_from_value)
TEST(latlng_extractor_test, set_latlng_indices)
{
ASSERT_EQ(std::string(lbs_extractor->name()), "latlng_extractor_for_lbs");
latlng_extractor extractor;
ASSERT_FALSE(extractor.set_latlng_indices(3, 3).is_ok());
ASSERT_TRUE(extractor.set_latlng_indices(3, 4).is_ok());
ASSERT_TRUE(extractor.set_latlng_indices(4, 3).is_ok());
}
S2LatLng latlng;
ASSERT_TRUE(lbs_extractor->extract_from_value(lbs_extractor->value_sample(), latlng));
TEST(latlng_extractor_for_lbs_test, extract_from_value)
{
latlng_extractor extractor;
ASSERT_TRUE(extractor.set_latlng_indices(5, 4).is_ok());
double lat_degrees = 12.345;
double lng_degrees = 67.890;
S2LatLng latlng;
std::string test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) +
"|24.043028|4.15921|0|-1";
ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001);
ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "|2018-04-26|2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + "|" +
std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1";
ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001);
ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e||2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + "|" +
std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1";
ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001);
ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) + "||4.15921|0|-1";
ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001);
ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) +
"|24.043028|4.15921|0|";
ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001);
ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr||" +
std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1";
ASSERT_FALSE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" +
std::to_string(lng_degrees) + "||24.043028|4.15921|0|-1";
ASSERT_FALSE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|||24.043028|4.15921|0|-1";
ASSERT_FALSE(lbs_extractor->extract_from_value(test_value, latlng));
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
}
TEST(latlng_extractor_for_aibox_test, extract_from_value)
{
latlng_extractor extractor;
ASSERT_TRUE(extractor.set_latlng_indices(0, 1).is_ok());
double lat_degrees = 12.345;
double lng_degrees = 67.890;
S2LatLng latlng;
std::string test_value = std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees);
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees) + "|24.043028";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees) + "||";
ASSERT_TRUE(extractor.extract_from_value(test_value, latlng));
EXPECT_DOUBLE_EQ(latlng.lat().degrees(), lat_degrees);
EXPECT_DOUBLE_EQ(latlng.lng().degrees(), lng_degrees);
test_value = "|" + std::to_string(lat_degrees) + "|" + std::to_string(lng_degrees);
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
test_value = "|" + std::to_string(lat_degrees);
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
test_value = std::to_string(lng_degrees) + "|";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
test_value = "|";
ASSERT_FALSE(extractor.extract_from_value(test_value, latlng));
}
} // namespace geo
......
......@@ -20,8 +20,7 @@ class geo_client_test : public ::testing::Test
public:
geo_client_test()
{
_geo_client.reset(new pegasus::geo::geo_client(
"config.ini", "onebox", "temp", "temp_geo", new latlng_extractor_for_lbs()));
_geo_client.reset(new pegasus::geo::geo_client("config.ini", "onebox", "temp", "temp_geo"));
}
pegasus_client *common_data_client() { return _geo_client->_common_data_client; }
......
......@@ -8,7 +8,7 @@ count = 1
[apps.proxy]
name = proxy
type = proxy
arguments = redis_cluster temp
arguments = onebox temp
ports = 6379
pools = THREAD_POOL_DEFAULT
run = true
......@@ -123,7 +123,6 @@ falcon_host = 127.0.0.1
falcon_port = 1988
falcon_path = /v1/push
[uri-resolver.dsn://redis_cluster]
factory = partition_resolver_simple
arguments = localhost:34601
[pegasus.clusters]
onebox = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
......@@ -66,11 +66,8 @@ redis_parser::redis_parser(proxy_stub *op, dsn::message_ex *first_msg)
meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), op->get_cluster());
r = new ::dsn::apps::rrdb_client(op->get_cluster(), meta_list, op->get_app());
if (strlen(op->get_geo_app()) != 0) {
_geo_client = dsn::make_unique<geo::geo_client>("config.ini",
op->get_cluster(),
op->get_app(),
op->get_geo_app(),
new geo::latlng_extractor_for_lbs());
_geo_client = dsn::make_unique<geo::geo_client>(
"config.ini", op->get_cluster(), op->get_app(), op->get_geo_app());
}
} else {
r = new ::dsn::apps::rrdb_client();
......
......@@ -8,7 +8,7 @@ count = 1
[apps.proxy]
name = proxy
type = proxy
arguments = redis_cluster temp
arguments = onebox temp
ports = 12345
pools = THREAD_POOL_DEFAULT
run = true
......@@ -92,7 +92,5 @@ allow_inline = false
is_trace = false
allow_inline = false
[uri-resolver.dsn://redis_cluster]
factory = partition_resolver_simple
arguments = localhost:34601, localhost:34602, localhost:34603
[pegasus.clusters]
onebox = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
......@@ -1709,12 +1709,10 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
std::unique_ptr<pegasus::geo::geo_client> target_geo_client;
if (is_geo_data) {
target_geo_client.reset(
new pegasus::geo::geo_client("config.ini",
target_cluster_name.c_str(),
target_app_name.c_str(),
target_geo_app_name.c_str(),
new pegasus::geo::latlng_extractor_for_lbs()));
target_geo_client.reset(new pegasus::geo::geo_client("config.ini",
target_cluster_name.c_str(),
target_app_name.c_str(),
target_geo_app_name.c_str()));
}
std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册