提交 440e18bb 编写于 作者: X xj.lin

MS-137 - Integrate knowhere


Former-commit-id: 67d9be936996437411ac5941c3e322b08a9389bf
上级 92ebfe95
......@@ -3,17 +3,6 @@
Please mark all change in change log and use the ticket from JIRA.
# Milvus 0.3.2 (2019-07-10)
## Bug
## Improvement
## New Feature
- MS-154 - Integrate knowhere
## Task
# Milvus 0.3.1 (2019-07-10)
## Bug
......@@ -21,6 +10,7 @@ Please mark all change in change log and use the ticket from JIRA.
## Improvement
## New Feature
- MS-137 - Integrate knowhere
## Task
......
......@@ -16,12 +16,61 @@ namespace zilliz {
namespace milvus {
namespace engine {
struct FileIOWriter {
std::fstream fs;
std::string name;
FileIOWriter(const std::string &fname);
~FileIOWriter();
size_t operator()(void *ptr, size_t size);
};
struct FileIOReader {
std::fstream fs;
std::string name;
FileIOReader(const std::string &fname);
~FileIOReader();
size_t operator()(void *ptr, size_t size);
size_t operator()(void *ptr, size_t size, size_t pos);
};
FileIOReader::FileIOReader(const std::string &fname) {
name = fname;
fs = std::fstream(name, std::ios::in | std::ios::binary);
}
FileIOReader::~FileIOReader() {
fs.close();
}
size_t FileIOReader::operator()(void *ptr, size_t size) {
fs.read(reinterpret_cast<char *>(ptr), size);
}
size_t FileIOReader::operator()(void *ptr, size_t size, size_t pos) {
return 0;
}
FileIOWriter::FileIOWriter(const std::string &fname) {
name = fname;
fs = std::fstream(name, std::ios::out | std::ios::binary);
}
FileIOWriter::~FileIOWriter() {
fs.close();
}
size_t FileIOWriter::operator()(void *ptr, size_t size) {
fs.write(reinterpret_cast<char *>(ptr), size);
}
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension,
const std::string &location,
EngineType type)
: location_(location), dim(dimension), build_type(type) {
index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
current_type = EngineType::FAISS_IDMAP;
std::static_pointer_cast<BFIndex>(index_)->Build(dimension);
}
......@@ -29,6 +78,7 @@ ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
const std::string &location,
EngineType type)
: index_(std::move(index)), location_(location), build_type(type) {
current_type = type;
}
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
......@@ -80,26 +130,85 @@ size_t ExecutionEngineImpl::PhysicalSize() const {
}
Status ExecutionEngineImpl::Serialize() {
// TODO(groot):
auto binaryset = index_->Serialize();
FileIOWriter writer(location_);
writer(&current_type, sizeof(current_type));
for (auto &iter: binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
writer(&meta_length, sizeof(meta_length));
writer((void *) meta, meta_length);
auto binary = iter.second;
size_t binary_length = binary->size;
writer(&binary_length, sizeof(binary_length));
writer((void *) binary->data.get(), binary_length);
}
return Status::OK();
}
Status ExecutionEngineImpl::Load() {
// TODO(groot):
index_ = Load(location_);
return Status::OK();
}
VecIndexPtr ExecutionEngineImpl::Load(const std::string &location) {
// TODO(groot): dev func in Fake code
// pseude code
//auto data = read_file(location);
//auto index_type = get_index_type(data);
//auto binaryset = get_index_binary(data);
/////
//return LoadVecIndex(index_type, binaryset);
return nullptr;
knowhere::BinarySet load_data_list;
FileIOReader reader(location);
reader.fs.seekg(0, reader.fs.end);
size_t length = reader.fs.tellg();
reader.fs.seekg(0);
size_t rp = 0;
reader(&current_type, sizeof(current_type));
rp += sizeof(current_type);
while (rp < length) {
size_t meta_length;
reader(&meta_length, sizeof(meta_length));
rp += sizeof(meta_length);
reader.fs.seekg(rp);
auto meta = new char[meta_length];
reader(meta, meta_length);
rp += meta_length;
reader.fs.seekg(rp);
size_t bin_length;
reader(&bin_length, sizeof(bin_length));
rp += sizeof(bin_length);
reader.fs.seekg(rp);
auto bin = new uint8_t[bin_length];
reader(bin, bin_length);
rp += bin_length;
auto xx = std::make_shared<uint8_t>();
xx.reset(bin);
load_data_list.Append(std::string(meta, meta_length), xx, bin_length);
}
auto index_type = IndexType::INVALID;
switch (current_type) {
case EngineType::FAISS_IDMAP: {
index_type = IndexType::FAISS_IDMAP;
break;
}
case EngineType::FAISS_IVFFLAT_CPU: {
index_type = IndexType::FAISS_IVFFLAT_CPU;
break;
}
case EngineType::FAISS_IVFFLAT_GPU: {
index_type = IndexType::FAISS_IVFFLAT_GPU;
break;
}
case EngineType::SPTAG_KDT_RNT_CPU: {
index_type = IndexType::SPTAG_KDT_RNT_CPU;
break;
}
}
return LoadVecIndex(index_type, load_data_list);
}
Status ExecutionEngineImpl::Merge(const std::string &location) {
......@@ -113,7 +222,7 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
to_merge = Load(location);
}
auto file_index = std::dynamic_pointer_cast<BFIndex>(index_);
auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge);
index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
return Status::OK();
}
......
......@@ -64,6 +64,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
protected:
VecIndexPtr index_ = nullptr;
EngineType build_type;
EngineType current_type;
int64_t dim;
std::string location_;
......
......@@ -151,7 +151,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
std::vector<float> output_distence;
for(auto& context : search_contexts_) {
//step 1: allocate memory
auto inner_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk();
auto inner_k = context->topk();
output_ids.resize(inner_k*context->nq());
output_distence.resize(inner_k*context->nq());
......@@ -164,7 +164,8 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
//step 3: cluster result
SearchContext::ResultSet result_set;
ClusterResult(output_ids, output_distence, context->nq(), inner_k, result_set);
auto spec_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk();
ClusterResult(output_ids, output_distence, context->nq(), spec_k, result_set);
rc.Record("cluster result");
//step 4: pick up topk result
......
......@@ -12,8 +12,8 @@
//#include <memory>
//#include <fstream>
//
//#include "faiss/AutoTune.h"
//#include "faiss/index_io.h"
#include "faiss/AutoTune.h"
#include "faiss/index_io.h"
//
//#include "Operand.h"
......
......@@ -4,6 +4,7 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <src/utils/Log.h>
#include "knowhere/index/vector_index/idmap.h"
#include "vec_impl.h"
......@@ -27,7 +28,9 @@ void VecIndexImpl::BuildAll(const long &nb,
auto preprocessor = index_->BuildPreprocessor(dataset, cfg);
index_->set_preprocessor(preprocessor);
auto model = index_->Train(dataset, cfg);
auto nlist = int(nb / 1000000.0 * 16384);
auto cfg_t = Config::object{{"nlist", nlist}, {"dim", dim}};
auto model = index_->Train(dataset, cfg_t);
index_->set_index_model(model);
index_->Add(dataset, cfg);
}
......@@ -71,7 +74,7 @@ void VecIndexImpl::Search(const long &nq, const float *xq, float *dist, long *id
//}
auto p_ids = ids_array->data()->GetValues<int64_t>(1, 0);
auto p_dist = ids_array->data()->GetValues<float>(1, 0);
auto p_dist = dis_array->data()->GetValues<float>(1, 0);
// TODO(linxj): avoid copy here.
memcpy(ids, p_ids, sizeof(int64_t) * nq * k);
......@@ -84,6 +87,7 @@ zilliz::knowhere::BinarySet VecIndexImpl::Serialize() {
void VecIndexImpl::Load(const zilliz::knowhere::BinarySet &index_binary) {
index_->Load(index_binary);
dim = Dimension();
}
int64_t VecIndexImpl::Dimension() {
......@@ -95,7 +99,9 @@ int64_t VecIndexImpl::Count() {
}
float *BFIndex::GetRawVectors() {
return std::static_pointer_cast<IDMAP>(index_)->GetRawVectors();
auto raw_index = std::dynamic_pointer_cast<IDMAP>(index_);
if (raw_index) { return raw_index->GetRawVectors(); }
return nullptr;
}
int64_t *BFIndex::GetRawIds() {
......@@ -107,6 +113,19 @@ void BFIndex::Build(const int64_t &d) {
std::static_pointer_cast<IDMAP>(index_)->Train(dim);
}
void BFIndex::BuildAll(const long &nb,
const float *xb,
const long *ids,
const Config &cfg,
const long &nt,
const float *xt) {
dim = cfg["dim"].as<int>();
auto dataset = GenDatasetWithIds(nb, dim, xb, ids);
std::static_pointer_cast<IDMAP>(index_)->Train(dim);
index_->Add(dataset, cfg);
}
}
}
}
......@@ -32,7 +32,7 @@ class VecIndexImpl : public VecIndex {
void Search(const long &nq, const float *xq, float *dist, long *ids, const Config &cfg) override;
protected:
int64_t dim;
int64_t dim = 0;
std::shared_ptr<zilliz::knowhere::VectorIndex> index_ = nullptr;
};
......@@ -41,6 +41,12 @@ class BFIndex : public VecIndexImpl {
explicit BFIndex(std::shared_ptr<zilliz::knowhere::VectorIndex> index) : VecIndexImpl(std::move(index)) {};
void Build(const int64_t& d);
float* GetRawVectors();
void BuildAll(const long &nb,
const float *xb,
const long *ids,
const Config &cfg,
const long &nt,
const float *xt) override;
int64_t* GetRawIds();
};
......
knowhere @ 3a30677b
Subproject commit c0df766214d7fa288ffedd77cd06a8ba8620c8df
Subproject commit 3a30677b8ab105955534922d1677e8fa99ef0406
......@@ -38,7 +38,7 @@ set(unittest_libs
${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so
)
add_subdirectory(server)
#add_subdirectory(server)
add_subdirectory(db)
add_subdirectory(index_wrapper)
#add_subdirectory(faiss_wrapper)
......
......@@ -6,7 +6,7 @@
aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
#aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src)
aux_source_directory(./ test_srcs)
......
......@@ -26,32 +26,32 @@ namespace {
}
TEST(DBMiscTest, ENGINE_API_TEST) {
//engine api AddWithIdArray
const uint16_t dim = 512;
const long n = 10;
engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat");
std::vector<float> vectors;
std::vector<long> ids;
for (long i = 0; i < n; i++) {
for (uint16_t k = 0; k < dim; k++) {
vectors.push_back((float) k);
}
ids.push_back(i);
}
auto status = engine.AddWithIdArray(vectors, ids);
ASSERT_TRUE(status.ok());
auto engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::INVALID);
ASSERT_EQ(engine_ptr, nullptr);
engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IVFFLAT_GPU);
ASSERT_NE(engine_ptr, nullptr);
engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IDMAP);
ASSERT_NE(engine_ptr, nullptr);
}
//TEST(DBMiscTest, ENGINE_API_TEST) {
// //engine api AddWithIdArray
// const uint16_t dim = 512;
// const long n = 10;
// engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat");
// std::vector<float> vectors;
// std::vector<long> ids;
// for (long i = 0; i < n; i++) {
// for (uint16_t k = 0; k < dim; k++) {
// vectors.push_back((float) k);
// }
// ids.push_back(i);
// }
//
// auto status = engine.AddWithIdArray(vectors, ids);
// ASSERT_TRUE(status.ok());
//
// auto engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::INVALID);
// ASSERT_EQ(engine_ptr, nullptr);
//
// engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IVFFLAT_GPU);
// ASSERT_NE(engine_ptr, nullptr);
//
// engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IDMAP);
// ASSERT_NE(engine_ptr, nullptr);
//}
TEST(DBMiscTest, EXCEPTION_TEST) {
engine::Exception ex1("");
......
......@@ -11,7 +11,7 @@
#include "utils.h"
using namespace zilliz::vecwise::engine;
using namespace zilliz::milvus::engine;
using namespace zilliz::knowhere;
using ::testing::TestWithParam;
......@@ -20,7 +20,7 @@ using ::testing::Combine;
class KnowhereWrapperTest
: public TestWithParam<::std::tuple<std::string, std::string, int, int, int, int, Config, Config>> {
: public TestWithParam<::std::tuple<IndexType, std::string, int, int, int, int, Config, Config>> {
protected:
void SetUp() override {
std::string generator_type;
......@@ -34,7 +34,7 @@ class KnowhereWrapperTest
}
protected:
std::string index_type;
IndexType index_type;
Config train_cfg;
Config search_cfg;
......@@ -55,12 +55,12 @@ class KnowhereWrapperTest
INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
Values(
// ["Index type", "Generator type", "dim", "nb", "nq", "k", "build config", "search config"]
std::make_tuple("IVF", "Default",
std::make_tuple(IndexType::FAISS_IVFFLAT_CPU, "Default",
64, 10000, 10, 10,
Config::object{{"nlist", 100}, {"dim", 64}},
Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 20}}
),
std::make_tuple("SPTAG", "Default",
std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default",
64, 10000, 10, 10,
Config::object{{"TPTNumber", 1}, {"dim", 64}},
Config::object{{"dim", 64}, {"k", 10}}
......@@ -113,16 +113,39 @@ TEST_P(KnowhereWrapperTest, serialize_test) {
{
auto binaryset = index_->Serialize();
//int fileno = 0;
//const std::string &base_name = "/tmp/wrapper_serialize_test_bin_";
//std::vector<std::string> filename_list;
//std::vector<std::pair<std::string, size_t >> meta_list;
//for (auto &iter: binaryset.binary_map_) {
// const std::string &filename = base_name + std::to_string(fileno);
// FileIOWriter writer(filename);
// writer(iter.second->data.get(), iter.second->size);
//
// meta_list.push_back(std::make_pair(iter.first, iter.second.size));
// filename_list.push_back(filename);
// ++fileno;
//}
//
//BinarySet load_data_list;
//for (int i = 0; i < filename_list.size() && i < meta_list.size(); ++i) {
// auto bin_size = meta_list[i].second;
// FileIOReader reader(filename_list[i]);
// std::vector<uint8_t> load_data(bin_size);
// reader(load_data.data(), bin_size);
// load_data_list.Append(meta_list[i].first, load_data);
//}
int fileno = 0;
const std::string &base_name = "/tmp/wrapper_serialize_test_bin_";
std::vector<std::string> filename_list;
const std::string &base_name = "/tmp/wrapper_serialize_test_bin_";
std::vector<std::pair<std::string, size_t >> meta_list;
for (auto &iter: binaryset.binary_map_) {
const std::string &filename = base_name + std::to_string(fileno);
FileIOWriter writer(filename);
writer(iter.second.data, iter.second.size);
writer(iter.second->data.get(), iter.second->size);
meta_list.push_back(std::make_pair(iter.first, iter.second.size));
meta_list.emplace_back(std::make_pair(iter.first, iter.second->size));
filename_list.push_back(filename);
++fileno;
}
......@@ -131,9 +154,12 @@ TEST_P(KnowhereWrapperTest, serialize_test) {
for (int i = 0; i < filename_list.size() && i < meta_list.size(); ++i) {
auto bin_size = meta_list[i].second;
FileIOReader reader(filename_list[i]);
std::vector<uint8_t> load_data(bin_size);
reader(load_data.data(), bin_size);
load_data_list.Append(meta_list[i].first, load_data);
auto load_data = new uint8_t[bin_size];
reader(load_data, bin_size);
auto data = std::make_shared<uint8_t>();
data.reset(load_data);
load_data_list.Append(meta_list[i].first, data, bin_size);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册