diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index bccc6bbf925941d7536e3b2e56eb9e0890f2a1d2..a19fd340daaa194c7c4cd22785896ede75178eda 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -107,13 +107,18 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date //dates partly delete files of the table but currently we don't support ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id; - mem_mgr_->EraseMemVector(table_id); //not allow insert - meta_ptr_->DeleteTable(table_id); //soft delete table + if (dates.empty()) { + mem_mgr_->EraseMemVector(table_id); //not allow insert + meta_ptr_->DeleteTable(table_id); //soft delete table + + //scheduler will determine when to delete table files + TaskScheduler& scheduler = TaskScheduler::GetInstance(); + DeleteContextPtr context = std::make_shared(table_id, meta_ptr_); + scheduler.Schedule(context); + } else { + meta_ptr_->DropPartitionsByDates(table_id, dates); + } - //scheduler will determine when to delete table files - TaskScheduler& scheduler = TaskScheduler::GetInstance(); - DeleteContextPtr context = std::make_shared(table_id, meta_ptr_); - scheduler.Schedule(context); return Status::OK(); } diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index b4f7bf09223e7362b936e54a6ef24acd43610f6b..0022f00282b2316de620e88626290d0ac38fc688 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -302,6 +302,15 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::cout << "DropIndex function call status: " << stat.ToString() << std::endl; } + {//delete by range + Range rg; + rg.start_value = CurrentTmDate(-2); + rg.end_value = CurrentTmDate(-3); + + Status stat = conn->DeleteByRange(rg, TABLE_NAME); + std::cout << "DeleteByRange function call status: " << stat.ToString() << std::endl; + } + {//delete table Status stat = conn->DropTable(TABLE_NAME); std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl; diff --git a/cpp/src/sdk/grpc/ClientProxy.cpp b/cpp/src/sdk/grpc/ClientProxy.cpp index 48c5159bb5ca58840079323296033216f8ee3606..6421a34d198af568e56043ffe827ddf9db24f60f 100644 --- a/cpp/src/sdk/grpc/ClientProxy.cpp +++ b/cpp/src/sdk/grpc/ClientProxy.cpp @@ -330,7 +330,15 @@ ClientProxy::ServerStatus() const { Status ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) { - + try { + ::milvus::grpc::DeleteByRangeParam delete_by_range_param; + delete_by_range_param.set_table_name(table_name); + delete_by_range_param.mutable_range()->set_start_value(range.start_value); + delete_by_range_param.mutable_range()->set_end_value(range.end_value); + return client_ptr_->DeleteByRange(delete_by_range_param); + } catch (std::exception &ex) { + return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what())); + } } Status @@ -341,7 +349,7 @@ ClientProxy::PreloadTable(const std::string &table_name) const { Status status = client_ptr_->PreloadTable(grpc_table_name); return status; } catch (std::exception &ex) { - return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what())); + return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what())); } } diff --git a/cpp/src/sdk/grpc/GrpcClient.cpp b/cpp/src/sdk/grpc/GrpcClient.cpp index 77478e5d7efd68f010068b7fbc68a30572eb9a4c..49ce3556567cc94fc86bd36311e12249de45f168 100644 --- a/cpp/src/sdk/grpc/GrpcClient.cpp +++ b/cpp/src/sdk/grpc/GrpcClient.cpp @@ -265,13 +265,26 @@ GrpcClient::PreloadTable(milvus::grpc::TableName &table_name) { } Status -GrpcClient::Disconnect() { - stub_.release(); +GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) { + ClientContext context; + ::milvus::grpc::Status response; + ::grpc::Status grpc_status = stub_->DeleteByRange(&context, delete_by_range_param, &response); + + if (!grpc_status.ok()) { + std::cerr << "DeleteByRange gRPC failed!" << std::endl; + return Status(StatusCode::RPCFailed, grpc_status.error_message()); + } + + if (response.error_code() != grpc::SUCCESS) { + std::cerr << response.reason() << std::endl; + return Status(StatusCode::ServerFailed, response.reason()); + } return Status::OK(); } Status -GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) { +GrpcClient::Disconnect() { + stub_.release(); return Status::OK(); } diff --git a/cpp/src/sdk/interface/ConnectionImpl.cpp b/cpp/src/sdk/interface/ConnectionImpl.cpp index 355c06438c5814809bdbd4b7e07e30dfefeb90b2..0f3080574fba174483e85ad9122742c04e759f11 100644 --- a/cpp/src/sdk/interface/ConnectionImpl.cpp +++ b/cpp/src/sdk/interface/ConnectionImpl.cpp @@ -117,7 +117,7 @@ ConnectionImpl::ServerStatus() const { Status ConnectionImpl::DeleteByRange(Range &range, const std::string &table_name) { - + return client_proxy_->DeleteByRange(range, table_name); } Status diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index ad1a0e3d7110544968b42aa3e666246d2b58e0ee..437573601fec8d41e0091ccb996905abb9a79694 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -168,7 +168,12 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context, GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, const ::milvus::grpc::DeleteByRangeParam *request, ::milvus::grpc::Status *response) { - + BaseTaskPtr task_ptr = DeleteByRangeTask::Create(*request); + ::milvus::grpc::Status grpc_status; + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); + response->set_error_code(grpc_status.error_code()); + response->set_reason(grpc_status.reason()); + return ::grpc::Status::OK; } ::grpc::Status diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index 10ca2b80ed4a622498299cdd075a01380165bf7f..db96ad6202d624b36f148d198bb7ffc839c79d5e 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -716,6 +716,73 @@ CmdTask::OnExecute() { return SERVER_SUCCESS; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) + : GrpcBaseTask(DDL_DML_TASK_GROUP), + delete_by_range_param_(delete_by_range_param){ +} + +BaseTaskPtr +DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) { + return std::shared_ptr(new DeleteByRangeTask(delete_by_range_param)); +} + +ServerError +DeleteByRangeTask::OnExecute() { + try { + TimeRecorder rc("DeleteByRangeTask"); + + //step 1: check arguments + std::string table_name = delete_by_range_param_.table_name(); + ServerError res = ValidationUtil::ValidateTableName(table_name); + if (res != SERVER_SUCCESS) { + return SetError(res, "Invalid table name: " + table_name); + } + + //step 2: check table existence + engine::meta::TableSchema table_info; + table_info.table_id_ = table_name; + engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); + if (!stat.ok()) { + if (stat.IsNotFound()) { + return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name + " not exists"); + } else { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } + } + + rc.ElapseFromBegin("check validation"); + + //step 3: check date range, and convert to db dates + std::vector dates; + ServerError error_code = SERVER_SUCCESS; + std::string error_msg; + + std::vector<::milvus::grpc::Range> range_array; + range_array.emplace_back(delete_by_range_param_.range()); + ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg); + if (error_code != SERVER_SUCCESS) { + return SetError(error_code, error_msg); + } + +#ifdef MILVUS_ENABLE_PROFILING + std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) + + "_top_" + std::to_string(this->top_k_) + "_" + + GetCurrTimeStr() + ".profiling"; + ProfilerStart(fname.c_str()); +#endif + engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates); + if (!status.ok()) { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } + + } catch (std::exception &ex) { + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); + } + + return SERVER_SUCCESS; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// PreloadTableTask::PreloadTableTask(const std::string &table_name) : GrpcBaseTask(DDL_DML_TASK_GROUP), diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index b6f052a5dbe8cb1c500ef6770e5070d407ff92e5..2426846c15c3ffe9321e56915d91ea232372df30 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -9,6 +9,7 @@ #include "db/meta/MetaConsts.h" #include "db/Factories.h" #include "cache/CpuCacheMgr.h" +#include "utils/CommonUtil.h" #include #include @@ -26,6 +27,8 @@ namespace { static constexpr int64_t TABLE_DIM = 256; static constexpr int64_t VECTOR_COUNT = 250000; static constexpr int64_t INSERT_LOOP = 10000; + static constexpr int64_t SECONDS_EACH_HOUR = 3600; + static constexpr int64_t DAY_SECONDS = 24 * 60 * 60; engine::meta::TableSchema BuildTableSchema() { engine::meta::TableSchema table_info; @@ -45,6 +48,52 @@ namespace { } } + std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tt = tt + 24*SECONDS_EACH_HOUR*offset_day; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; + } + + void + ConvertTimeRangeToDBDates(const std::string &start_value, + const std::string &end_value, + std::vector &dates) { + dates.clear(); + + time_t tt_start, tt_end; + tm tm_start, tm_end; + if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(start_value, tt_start, tm_start)) { + return; + } + + if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(end_value, tt_end, tm_end)) { + return; + } + + long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / + DAY_SECONDS; + if (days == 0) { + return; + } + + for (long i = 0; i < days; i++) { + time_t tt_day = tt_start + DAY_SECONDS * i; + tm tm_day; + zilliz::milvus::server::CommonUtil::ConvertTime(tt_day, tm_day); + + long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + + tm_day.tm_mday;//according to db logic + dates.push_back(date); + } + } + } TEST_F(DBTest, CONFIG_TEST) { @@ -313,8 +362,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { }; TEST_F(DBTest2, DELETE_TEST) { - - engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); @@ -349,4 +396,45 @@ TEST_F(DBTest2, DELETE_TEST) { db_->HasTable(TABLE_NAME, has_table); ASSERT_FALSE(has_table); -}; \ No newline at end of file +}; + +TEST_F(DBTest2, DELETE_BY_RANGE_TEST) { + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + + bool has_table = false; + db_->HasTable(TABLE_NAME, has_table); + ASSERT_TRUE(has_table); + + engine::IDNumbers vector_ids; + + uint64_t size; + db_->Size(size); + + int64_t nb = INSERT_LOOP; + std::vector xb; + BuildVectors(nb, xb); + + int loop = 20; + for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::vector dates; + std::string start_value = CurrentTmDate(-3); + std::string end_value = CurrentTmDate(-2); + ConvertTimeRangeToDBDates(start_value, end_value, dates); + + db_->DeleteTable(TABLE_NAME, dates); +} \ No newline at end of file