提交 1c1fa21e 编写于 作者: G groot

MS-96 add new query interface for specified files


Former-commit-id: 87f4c6ac2216b7c3c3cf9cf5a24d0438040d6841
上级 55754a0e
......@@ -4,6 +4,7 @@ server_config:
transfer_protocol: binary #optional: binary, compact, json
server_mode: thread_pool #optional: simple, thread_pool
gpu_index: 0 #which gpu to be used
mode: local #optional: local, cluster
db_config:
db_path: /tmp/milvus
......
......@@ -227,15 +227,18 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
uint64_t k, uint64_t nq, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
//get specified files
meta::TableFilesSchema files_array;
std::vector<size_t> ids;
for (auto &id : file_ids) {
meta::TableFileSchema table_file;
table_file.table_id_ = id;
auto status = meta_ptr_->GetTableFile(table_file);
if (!status.ok()) {
return status;
}
files_array.emplace_back(table_file);
table_file.table_id_ = table_id;
std::string::size_type sz;
ids.push_back(std::stol(id, &sz));
}
meta::TableFilesSchema files_array;
auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
if (!status.ok()) {
return status;
}
return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
......
......@@ -604,32 +604,41 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
return Status::OK();
}
Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
Status DBMetaImpl::GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) {
try {
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
table_files.clear();
auto files = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::size_,
&TableFileSchema::date_),
where(c(&TableFileSchema::file_id_) == file_schema.file_id_ and
c(&TableFileSchema::table_id_) == file_schema.table_id_
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
where(c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::id_, ids)
));
assert(files.size() <= 1);
if (files.size() == 1) {
file_schema.id_ = std::get<0>(files[0]);
file_schema.table_id_ = std::get<1>(files[0]);
file_schema.file_id_ = std::get<2>(files[0]);
file_schema.file_type_ = std::get<3>(files[0]);
file_schema.size_ = std::get<4>(files[0]);
file_schema.date_ = std::get<5>(files[0]);
} else {
return Status::NotFound("Table:" + file_schema.table_id_ +
" File:" + file_schema.file_id_ + " not found");
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
for (auto &file : files) {
TableFileSchema file_schema;
file_schema.file_id_ = std::get<0>(file);
file_schema.file_type_ = std::get<1>(file);
file_schema.size_ = std::get<2>(file);
file_schema.date_ = std::get<3>(file);
file_schema.engine_type_ = std::get<4>(file);
file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema);
table_files.emplace_back(file_schema);
}
} catch (std::exception &e) {
return HandleException("Encounter exception when lookup table file", e);
return HandleException("Encounter exception when lookup table files", e);
}
return Status::OK();
......
......@@ -31,7 +31,9 @@ public:
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
virtual Status GetTableFile(TableFileSchema& file_schema) override;
virtual Status GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
......
......@@ -35,14 +35,17 @@ public:
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) = 0;
virtual Status GetTableFile(TableFileSchema& file_schema) = 0;
virtual Status GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) = 0;
virtual Status UpdateTableFile(TableFileSchema& file_schema) = 0;
virtual Status UpdateTableFiles(TableFilesSchema& files) = 0;
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedTableFilesSchema& files) = 0;
virtual Status FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema& files) = 0;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) = 0;
......
......@@ -85,9 +85,13 @@ TEST_F(MetaTest, table_file_TEST) {
dates.push_back(table_file.date_);
status = impl_->DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_TRUE(status.ok());
status = impl_->GetTableFile(table_file);
std::vector<size_t> ids = {table_file.id_};
meta::TableFilesSchema files;
status = impl_->GetTableFiles(table_file.table_id_, ids, files);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE);
ASSERT_EQ(files.size(), 1UL);
ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE);
}
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
......@@ -113,6 +117,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
auto cnt = 100;
long ts = utils::GetMicroSecTimeStamp();
std::vector<int> days;
std::vector<size_t> ids;
for (auto i=0; i<cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
......@@ -121,14 +126,17 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
days.push_back(day);
ids.push_back(table_file.id_);
}
impl.Archive();
int i = 0;
for (auto file : files) {
status = impl.GetTableFile(file);
ASSERT_TRUE(status.ok());
meta::TableFilesSchema files_get;
status = impl.GetTableFiles(table_file.table_id_, ids, files_get);
ASSERT_TRUE(status.ok());
for(auto& file : files_get) {
if (days[i] < days_num) {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
} else {
......@@ -158,20 +166,24 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
auto cnt = 10;
auto each_size = 2UL;
std::vector<size_t> ids;
for (auto i=0; i<cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
table_file.size_ = each_size * meta::G;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
ids.push_back(table_file.id_);
}
impl.Archive();
int i = 0;
for (auto file : files) {
status = impl.GetTableFile(file);
ASSERT_TRUE(status.ok());
meta::TableFilesSchema files_get;
status = impl.GetTableFiles(table_file.table_id_, ids, files_get);
ASSERT_TRUE(status.ok());
for(auto& file : files_get) {
if (i < 5) {
ASSERT_TRUE(file.file_type_ == meta::TableFileSchema::TO_DELETE);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册