提交 7da06a7e 编写于 作者: G groot

support date range search


Former-commit-id: ec9841b7c586f27afadf90e7c49914850ad4d299
上级 8ef34c91
......@@ -5,7 +5,7 @@
******************************************************************************/
#include "PrometheusMetrics.h"
#include "utils/Log.h"
namespace zilliz {
namespace vecwise {
......@@ -13,18 +13,23 @@ namespace server {
ServerError
PrometheusMetrics::Init() {
ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false;
// Following should be read from config file.
const std::string bind_address = configNode.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT);
const std::string uri = std::string("/metrics");
const std::size_t num_threads = 2;
// Init Exposer
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
// Exposer Registry
exposer_ptr_->RegisterCollectable(registry_);
try {
ConfigNode &configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true : false;
// Following should be read from config file.
const std::string bind_address = configNode.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT);
const std::string uri = std::string("/metrics");
const std::size_t num_threads = 2;
// Init Exposer
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
// Exposer Registry
exposer_ptr_->RegisterCollectable(registry_);
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "Failed to connect prometheus server: " << std::string(ex.what());
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
......
......@@ -75,6 +75,18 @@ namespace {
return str;
}
std::string CurrentTmDate() {
time_t tt;
time( &tt );
tt = tt + 8*3600;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
+ "-" + std::to_string(t->tm_mday);
return str;
}
std::string GetTableName() {
static std::string s_id(CurrentTime());
return s_id;
......@@ -170,6 +182,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + 10, record_array);
std::vector<Range> query_range_array;
Range rg;
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate();
query_range_array.emplace_back(rg);
std::vector<TopKQueryResult> topk_query_result_array;
Status stat = conn->SearchVector(TABLE_NAME, record_array, query_range_array, TOP_K, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
......
......@@ -159,6 +159,8 @@ ClientProxy::SearchVector(const std::string &table_name,
}
try {
//step 1: convert vectors data
std::vector<thrift::RowRecord> thrift_records;
for(auto& record : query_record_array) {
thrift::RowRecord thrift_record;
......@@ -172,10 +174,21 @@ ClientProxy::SearchVector(const std::string &table_name,
thrift_records.emplace_back(thrift_record);
}
//step 2: convert range array
std::vector<thrift::Range> thrift_ranges;
for(auto& range : query_range_array) {
thrift::Range thrift_range;
thrift_range.__set_start_value(range.start_value);
thrift_range.__set_end_value(range.end_value);
thrift_ranges.emplace_back(thrift_range);
}
//step 3: search vectors
std::vector<thrift::TopKQueryResult> result_array;
ClientPtr()->interface()->SearchVector(result_array, table_name, thrift_records, thrift_ranges, topk);
//step 4: convert result array
for(auto& thrift_topk_result : result_array) {
TopKQueryResult result;
......
......@@ -75,6 +75,74 @@ namespace {
return map_type[type];
}
ServerError
ConvertRowRecordToFloatArray(const std::vector<thrift::RowRecord>& record_array,
uint64_t dimension,
std::vector<float>& float_array) {
ServerError error_code;
uint64_t vec_count = record_array.size();
float_array.resize(vec_count*dimension);//allocate enough memory
for(uint64_t i = 0; i < vec_count; i++) {
const auto& record = record_array[i];
if(record.vector_data.empty()) {
error_code = SERVER_INVALID_ARGUMENT;
SERVER_LOG_ERROR << "No vector provided in record";
return error_code;
}
uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value?
if(vec_dim != dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << dimension;
error_code = SERVER_INVALID_VECTOR_DIMENSION;
return error_code;
}
//convert double array to float array(thrift has no float type)
const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
for(uint64_t d = 0; d < vec_dim; d++) {
float_array[i*vec_dim + d] = (float)(d_p[d]);
}
}
return SERVER_SUCCESS;
}
static constexpr long DAY_SECONDS = 86400;
ServerError
ConvertTimeRangeToDBDates(const std::vector<megasearch::thrift::Range> &range_array,
std::vector<DB_DATE>& dates) {
dates.clear();
ServerError error_code;
for(auto& range : range_array) {
time_t tt_start, tt_end;
tm tm_start, tm_end;
if(!CommonUtil::TimeStrToTime(range.start_value, tt_start, tm_start)){
error_code = SERVER_INVALID_TIME_RANGE;
SERVER_LOG_ERROR << "Invalid time range: " << range.start_value;
return error_code;
}
if(!CommonUtil::TimeStrToTime(range.end_value, tt_end, tm_end)){
error_code = SERVER_INVALID_TIME_RANGE;
SERVER_LOG_ERROR << "Invalid time range: " << range.end_value;
return error_code;
}
long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS;
for(long i = 0; i <= days; i++) {
time_t tt_day = tt_start + DAY_SECONDS*i;
tm tm_day;
CommonUtil::ConvertTime(tt_day, tm_day);
long date = tm_day.tm_year*10000 + tm_day.tm_mon*100 + tm_day.tm_mday;//according to db logic
dates.push_back(date);
}
}
return SERVER_SUCCESS;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -237,37 +305,17 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("check validation");
//step 2: prepare float data
uint64_t vec_count = (uint64_t)record_array_.size();
uint64_t group_dim = table_info.dimension_;
std::vector<float> vec_f;
vec_f.resize(vec_count*group_dim);//allocate enough memory
for(uint64_t i = 0; i < vec_count; i++) {
const auto& record = record_array_[i];
if(record.vector_data.empty()) {
error_code_ = SERVER_INVALID_ARGUMENT;
error_msg_ = "No vector provided in record";
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value?
if(vec_dim != group_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
//convert double array to float array(thrift has no float type)
const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
for(uint64_t d = 0; d < vec_dim; d++) {
vec_f[i*vec_dim + d] = (float)(d_p[d]);
}
error_code_ = ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f);
if(error_code_ != SERVER_SUCCESS) {
error_msg_ = "Invalid row record data";
return error_code_;
}
rc.Record("prepare vectors data");
//step 3: insert vectors
uint64_t vec_count = (uint64_t)record_array_.size();
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
rc.Record("add vectors to engine");
if(!stat.ok()) {
......@@ -342,44 +390,29 @@ ServerError SearchVectorTask::OnExecute() {
return error_code_;
}
//step 3: check date range, and convert to db dates
std::vector<DB_DATE> dates;
error_code_ = ConvertTimeRangeToDBDates(range_array_, dates);
if(error_code_ != SERVER_SUCCESS) {
error_msg_ = "Invalid query range";
return error_code_;
}
rc.Record("check validation");
//step 3: prepare float data
std::vector<float> vec_f;
uint64_t record_count = (uint64_t)record_array_.size();
vec_f.resize(record_count*table_info.dimension_);
for(uint64_t i = 0; i < record_array_.size(); i++) {
const auto& record = record_array_[i];
if (record.vector_data.empty()) {
error_code_ = SERVER_INVALID_ARGUMENT;
error_msg_ = "Query record has no vector";
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t vec_dim = record.vector_data.size() / sizeof(double);//how many double value?
if (vec_dim != table_info.dimension_) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << table_info.dimension_;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
//convert double array to float array(thrift has no float type)
const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
for(uint64_t d = 0; d < vec_dim; d++) {
vec_f[i*vec_dim + d] = (float)(d_p[d]);
}
error_code_ = ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f);
if(error_code_ != SERVER_SUCCESS) {
error_msg_ = "Invalid row record data";
return error_code_;
}
rc.Record("prepare vector data");
//step 4: search vectors
std::vector<DB_DATE> dates;
engine::QueryResults results;
uint64_t record_count = (uint64_t)record_array_.size();
stat = DB()->Query(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
rc.Record("search vectors from engine");
if(!stat.ok()) {
......
......@@ -14,6 +14,7 @@
#include <dirent.h>
#include <string.h>
#include <iostream>
#include <time.h>
#include "boost/filesystem.hpp"
......@@ -150,15 +151,39 @@ std::string CommonUtil::GetExePath() {
return exe_path;
}
void CommonUtil::ConvertTime(int year, int month, int day, int hour, int minute, int second, time_t& t_t) {
tm t_m;
t_m.tm_year = year;
t_m.tm_mon = month;
t_m.tm_mday = day;
t_m.tm_hour = hour;
t_m.tm_min = minute;
t_m.tm_sec = second;
t_t = mktime(&t_m);
bool CommonUtil::TimeStrToTime(const std::string& time_str,
time_t &time_integer,
tm &time_struct,
const std::string& format) {
time_integer = 0;
memset(&time_struct, 0, sizeof(tm));
int ret = sscanf(time_str.c_str(),
format.c_str(),
&(time_struct.tm_year),
&(time_struct.tm_mon),
&(time_struct.tm_mday),
&(time_struct.tm_hour),
&(time_struct.tm_min),
&(time_struct.tm_sec));
if(ret <= 0) {
return false;
}
time_struct.tm_year -= 1900;
time_struct.tm_mon--;
time_integer = mktime(&time_struct);
return true;
}
void CommonUtil::ConvertTime(time_t time_integer, tm &time_struct) {
tm* t_m = localtime (&time_integer);
memcpy(&time_struct, t_m, sizeof(tm));
}
void ConvertTime(tm time_struct, time_t &time_integer) {
time_integer = mktime(&time_struct);
}
}
......
......@@ -26,7 +26,13 @@ class CommonUtil {
static std::string GetExePath();
static void ConvertTime(int year, int month, int day, int hour, int minute, int second, time_t& t_t);
static bool TimeStrToTime(const std::string& time_str,
time_t &time_integer,
tm &time_struct,
const std::string& format = "%d-%d-%d %d:%d:%d");
static void ConvertTime(time_t time_integer, tm &time_struct);
static void ConvertTime(tm time_struct, time_t &time_integer);
};
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册