提交 42cdf9a1 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

add DeleteByRange interface

See merge request megasearch/milvus!396

Former-commit-id: f875eab044460d5f2ce81bc04acc9784e959e8d0
......@@ -107,13 +107,18 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
//dates partly delete files of the table but currently we don't support
ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
if (dates.empty()) {
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
} else {
meta_ptr_->DropPartitionsByDates(table_id, dates);
}
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
return Status::OK();
}
......
......@@ -302,6 +302,15 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "DropIndex function call status: " << stat.ToString() << std::endl;
}
{//delete by range
Range rg;
rg.start_value = CurrentTmDate(-2);
rg.end_value = CurrentTmDate(-3);
Status stat = conn->DeleteByRange(rg, TABLE_NAME);
std::cout << "DeleteByRange function call status: " << stat.ToString() << std::endl;
}
{//delete table
Status stat = conn->DropTable(TABLE_NAME);
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
......
......@@ -330,7 +330,15 @@ ClientProxy::ServerStatus() const {
Status
ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) {
try {
::milvus::grpc::DeleteByRangeParam delete_by_range_param;
delete_by_range_param.set_table_name(table_name);
delete_by_range_param.mutable_range()->set_start_value(range.start_value);
delete_by_range_param.mutable_range()->set_end_value(range.end_value);
return client_ptr_->DeleteByRange(delete_by_range_param);
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
}
}
Status
......@@ -341,7 +349,7 @@ ClientProxy::PreloadTable(const std::string &table_name) const {
Status status = client_ptr_->PreloadTable(grpc_table_name);
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
}
}
......
......@@ -265,13 +265,26 @@ GrpcClient::PreloadTable(milvus::grpc::TableName &table_name) {
}
Status
GrpcClient::Disconnect() {
stub_.release();
GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->DeleteByRange(&context, delete_by_range_param, &response);
if (!grpc_status.ok()) {
std::cerr << "DeleteByRange gRPC failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
std::cerr << response.reason() << std::endl;
return Status(StatusCode::ServerFailed, response.reason());
}
return Status::OK();
}
Status
GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) {
GrpcClient::Disconnect() {
stub_.release();
return Status::OK();
}
......
......@@ -117,7 +117,7 @@ ConnectionImpl::ServerStatus() const {
Status
ConnectionImpl::DeleteByRange(Range &range,
const std::string &table_name) {
return client_proxy_->DeleteByRange(range, table_name);
}
Status
......
......@@ -168,7 +168,12 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context,
GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context,
const ::milvus::grpc::DeleteByRangeParam *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DeleteByRangeTask::Create(*request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->set_error_code(grpc_status.error_code());
response->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
}
::grpc::Status
......
......@@ -716,6 +716,73 @@ CmdTask::OnExecute() {
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param)
: GrpcBaseTask(DDL_DML_TASK_GROUP),
delete_by_range_param_(delete_by_range_param){
}
BaseTaskPtr
DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) {
return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param));
}
ServerError
DeleteByRangeTask::OnExecute() {
try {
TimeRecorder rc("DeleteByRangeTask");
//step 1: check arguments
std::string table_name = delete_by_range_param_.table_name();
ServerError res = ValidationUtil::ValidateTableName(table_name);
if (res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name);
}
//step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name;
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
if (!stat.ok()) {
if (stat.IsNotFound()) {
return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name + " not exists");
} else {
return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
}
}
rc.ElapseFromBegin("check validation");
//step 3: check date range, and convert to db dates
std::vector<DB_DATE> dates;
ServerError error_code = SERVER_SUCCESS;
std::string error_msg;
std::vector<::milvus::grpc::Range> range_array;
range_array.emplace_back(delete_by_range_param_.range());
ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
if (error_code != SERVER_SUCCESS) {
return SetError(error_code, error_msg);
}
#ifdef MILVUS_ENABLE_PROFILING
std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
"_top_" + std::to_string(this->top_k_) + "_" +
GetCurrTimeStr() + ".profiling";
ProfilerStart(fname.c_str());
#endif
engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates);
if (!status.ok()) {
return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
}
} catch (std::exception &ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string &table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP),
......
......@@ -9,6 +9,7 @@
#include "db/meta/MetaConsts.h"
#include "db/Factories.h"
#include "cache/CpuCacheMgr.h"
#include "utils/CommonUtil.h"
#include <gtest/gtest.h>
#include <easylogging++.h>
......@@ -26,6 +27,8 @@ namespace {
static constexpr int64_t TABLE_DIM = 256;
static constexpr int64_t VECTOR_COUNT = 250000;
static constexpr int64_t INSERT_LOOP = 10000;
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
static constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
engine::meta::TableSchema BuildTableSchema() {
engine::meta::TableSchema table_info;
......@@ -45,6 +48,52 @@ namespace {
}
}
std::string CurrentTmDate(int64_t offset_day = 0) {
time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
+ "-" + std::to_string(t->tm_mday);
return str;
}
void
ConvertTimeRangeToDBDates(const std::string &start_value,
const std::string &end_value,
std::vector<engine::meta::DateT > &dates) {
dates.clear();
time_t tt_start, tt_end;
tm tm_start, tm_end;
if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(start_value, tt_start, tm_start)) {
return;
}
if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(end_value, tt_end, tm_end)) {
return;
}
long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) /
DAY_SECONDS;
if (days == 0) {
return;
}
for (long i = 0; i < days; i++) {
time_t tt_day = tt_start + DAY_SECONDS * i;
tm tm_day;
zilliz::milvus::server::CommonUtil::ConvertTime(tt_day, tm_day);
long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 +
tm_day.tm_mday;//according to db logic
dates.push_back(date);
}
}
}
TEST_F(DBTest, CONFIG_TEST) {
......@@ -313,8 +362,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
};
TEST_F(DBTest2, DELETE_TEST) {
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
......@@ -349,4 +396,45 @@ TEST_F(DBTest2, DELETE_TEST) {
db_->HasTable(TABLE_NAME, has_table);
ASSERT_FALSE(has_table);
};
\ No newline at end of file
};
TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids;
uint64_t size;
db_->Size(size);
int64_t nb = INSERT_LOOP;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 20;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::vector<engine::meta::DateT> dates;
std::string start_value = CurrentTmDate(-3);
std::string end_value = CurrentTmDate(-2);
ConvertTimeRangeToDBDates(start_value, end_value, dates);
db_->DeleteTable(TABLE_NAME, dates);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册