提交 e8707d64 编写于 作者: G groot

Merge branch 'jinhai' of 192.168.1.105:jinhai/vecwise_engine into jinhai


Former-commit-id: 0f2498cbd94b36dd35305d984d6a58feb65dfb4c
......@@ -19,12 +19,12 @@ public:
virtual Status add_group(const GroupOptions& options_,
const std::string& group_id_,
GroupSchema& group_info_) = 0;
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) = 0;
meta::GroupSchema& group_info_) = 0;
virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) = 0;
meta::GroupFilesSchema& group_files_info_) = 0;
virtual Status add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
......
#include <assert.h>
#include <chrono>
#include <thread>
#include <iostream>
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
#include "db_impl.h"
#include "db_meta_impl.h"
#include "env.h"
......@@ -15,21 +19,21 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_)
_options(options_),
_bg_compaction_scheduled(false),
_shutting_down(false),
_pMeta(new DBMetaImpl(*(_options.pMetaOptions))),
_pMeta(new meta::DBMetaImpl(*(_options.pMetaOptions))),
_pMemMgr(new MemManager(_pMeta)) {
start_timer_task(options_.memory_sync_interval);
}
Status DBImpl::add_group(const GroupOptions& options,
const std::string& group_id,
GroupSchema& group_info) {
meta::GroupSchema& group_info) {
assert((!options.has_id) ||
(options.has_id && ("" != group_id)));
return _pMeta->add_group(options, group_id, group_info);
}
Status DBImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) {
Status DBImpl::get_group(const std::string& group_id_, meta::GroupSchema& group_info_) {
return _pMeta->get_group(group_id_, group_info_);
}
......@@ -39,7 +43,7 @@ Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
Status DBImpl::get_group_files(const std::string& group_id,
const int date_delta,
GroupFilesSchema& group_files_info) {
meta::GroupFilesSchema& group_files_info) {
return _pMeta->get_group_files(group_id, date_delta, group_files_info);
}
......@@ -99,8 +103,64 @@ void DBImpl::background_call() {
_bg_work_finish_signal.notify_all();
}
Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
const meta::GroupFilesSchema& files) {
meta::GroupFileSchema group_file;
Status status = _pMeta->add_group_file(group_id, date, group_file);
if (!status.ok()) {
return status;
}
faiss::IndexFlat innerIndex(group_file.dimension);
faiss::IndexIDMap index(&innerIndex);
meta::GroupFilesSchema updated;
for (auto& file : files) {
auto file_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
index.add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
file_index->id_map.data());
auto file_schema = file;
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
updated.push_back(file_schema);
}
faiss::write_index(&index, group_file.location.c_str());
group_file.file_type = meta::GroupFileSchema::RAW;
updated.push_back(group_file);
status = _pMeta->update_files(updated);
return status;
}
Status DBImpl::background_merge_files(const std::string& group_id) {
meta::DatePartionedGroupFilesSchema raw_files;
/* auto status = _pMeta->get_small_raw_files(group_id, raw_files); */
/* if (!status.ok()) { */
/* _bg_error = status; */
/* return status; */
/* } */
if (raw_files.size() == 0) {
return Status::OK();
}
for (auto& kv : raw_files) {
merge_files(group_id, kv.first, kv.second);
}
}
void DBImpl::background_compaction() {
_pMemMgr->serialize();
std::vector<std::string> group_ids;
_pMemMgr->serialize(group_ids);
for (auto group_id : group_ids) {
std::cout << __func__ << " group_id=" << group_id << std::endl;
}
if (group_ids.size() > 0) {
}
}
DBImpl::~DBImpl() {
......
......@@ -14,19 +14,23 @@ namespace engine {
class Env;
namespace meta {
class Meta;
}
class DBImpl : public DB {
public:
DBImpl(const Options& options_, const std::string& name_);
virtual Status add_group(const GroupOptions& options_,
const std::string& group_id_,
GroupSchema& group_info_) override;
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override;
meta::GroupSchema& group_info_) override;
virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
meta::GroupFilesSchema& group_files_info_) override;
virtual Status add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) override;
......@@ -37,6 +41,10 @@ public:
virtual ~DBImpl();
private:
Status merge_files(const std::string& group_id,
const meta::DateT& date,
const meta::GroupFilesSchema& files);
Status background_merge_files(const std::string& group_id);
void try_schedule_compaction();
void start_timer_task(int interval_);
......@@ -56,7 +64,7 @@ private:
Status _bg_error;
std::atomic<bool> _shutting_down;
std::shared_ptr<Meta> _pMeta;
std::shared_ptr<meta::Meta> _pMeta;
std::shared_ptr<MemManager> _pMemMgr;
}; // DBImpl
......
#include <ctime>
#include "db_meta.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
DateT Meta::GetDate(const std::time_t& t) {
tm *ltm = std::localtime(&t);
return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday;
}
DateT Meta::GetDate() {
return GetDate(std::time(nullptr));
}
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......@@ -3,12 +3,17 @@
#include <string>
#include <cstddef>
#include <vector>
#include <map>
#include <ctime>
#include "options.h"
#include "status.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
typedef int DateT;
struct GroupSchema {
size_t id;
......@@ -22,19 +27,24 @@ struct GroupSchema {
struct GroupFileSchema {
typedef enum {
NEW,
RAW,
INDEX
INDEX,
TO_DELETE,
} FILE_TYPE;
size_t id;
std::string group_id;
std::string file_id;
int files_type = RAW;
int file_type = NEW;
size_t rows;
DateT date;
uint16_t dimension;
std::string location = "";
}; // GroupFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
class Meta {
......@@ -47,6 +57,9 @@ public:
virtual Status add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_) = 0;
virtual Status add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info) = 0;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) = 0;
......@@ -59,8 +72,14 @@ public:
const int date_delta_,
GroupFilesSchema& group_files_info_) = 0;
virtual Status update_files(const GroupFilesSchema& files) = 0;
static DateT GetDate(const std::time_t& t);
static DateT GetDate();
}; // MetaData
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......@@ -6,6 +6,7 @@
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
DBMetaImpl::DBMetaImpl(const MetaOptions& options_)
: _options(static_cast<const DBMetaOptions&>(options_)) {
......@@ -26,12 +27,6 @@ Status DBMetaImpl::add_group(const GroupOptions& options_,
Status DBMetaImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) {
//PXU TODO
std::stringstream ss;
SimpleIDGenerator g;
ss.str("");
ss << "/tmp/test/" << g.getNextIDNumber() << ".log";
group_info_.dimension = 64;
group_info_.next_file_location = ss.str();
return Status::OK();
}
......@@ -40,9 +35,24 @@ Status DBMetaImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
return Status::OK();
}
Status DBMetaImpl::add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_) {
Status DBMetaImpl::add_group_file(const std::string& group_id,
GroupFileSchema& group_file_info) {
return add_group_file(group_id, Meta::GetDate(), group_file_info);
}
Status DBMetaImpl::add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info) {
//PXU TODO
std::stringstream ss;
SimpleIDGenerator g;
ss << "/tmp/test/" << date
<< "/" << g.getNextIDNumber()
<< ".log";
group_file_info.group_id = "1";
group_file_info.dimension = 64;
group_file_info.location = ss.str();
group_file_info.date = date;
return Status::OK();
}
......@@ -72,6 +82,12 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) {
return Status::OK();
}
Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
//PXU TODO
return Status::OK();
}
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......@@ -7,6 +7,7 @@
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
class DBMetaImpl : public Meta {
public:
......@@ -18,6 +19,9 @@ public:
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info) override;
virtual Status add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_) override;
virtual Status has_group_file(const std::string& group_id_,
......@@ -32,6 +36,8 @@ public:
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status update_files(const GroupFilesSchema& files) override;
private:
Status initialize();
......@@ -40,6 +46,7 @@ private:
}; // DBMetaImpl
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......
......@@ -13,10 +13,12 @@ namespace zilliz {
namespace vecwise {
namespace engine {
MemVectors::MemVectors(size_t dimension_, const std::string& file_location_) :
_file_location(file_location_),
MemVectors::MemVectors(const std::string& group_id,
size_t dimension, const std::string& file_location) :
group_id_(group_id),
_file_location(file_location),
_pIdGenerator(new SimpleIDGenerator()),
_dimension(dimension_),
_dimension(dimension),
_pInnerIndex(new faiss::IndexFlat(_dimension)),
_pIdMapIndex(new faiss::IndexIDMap(_pInnerIndex)) {
}
......@@ -37,13 +39,15 @@ size_t MemVectors::approximate_size() const {
return total() * _dimension;
}
void MemVectors::serialize() {
Status MemVectors::serialize(std::string& group_id) {
/* std::stringstream ss; */
/* ss << "/tmp/test/" << _pIdGenerator->getNextIDNumber(); */
/* faiss::write_index(_pIdMapIndex, ss.str().c_str()); */
/* std::cout << _pIdMapIndex->ntotal << std::endl; */
/* std::cout << _file_location << std::endl; */
faiss::write_index(_pIdMapIndex, _file_location.c_str());
group_id = group_id_;
return Status::OK();
}
MemVectors::~MemVectors() {
......@@ -71,14 +75,15 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) {
return memIt->second;
}
GroupSchema group_info;
Status status = _pMeta->get_group(group_id, group_info);
meta::GroupFileSchema group_file;
auto status = _pMeta->add_group_file(group_id, group_file);
if (!status.ok()) {
return nullptr;
}
_memMap[group_id] = std::shared_ptr<MemVectors>(new MemVectors(group_info.dimension,
group_info.next_file_location));
_memMap[group_id] = std::shared_ptr<MemVectors>(new MemVectors(group_file.group_id,
group_file.dimension,
group_file.location));
return _memMap[group_id];
}
......@@ -126,10 +131,13 @@ Status MemManager::mark_memory_as_immutable() {
/* return false; */
/* } */
Status MemManager::serialize() {
Status MemManager::serialize(std::vector<std::string>& group_ids) {
mark_memory_as_immutable();
std::string group_id;
group_ids.clear();
for (auto& mem : _immMems) {
mem->serialize();
mem->serialize(group_id);
group_ids.push_back(group_id);
}
_immMems.clear();
return Status::OK();
......
......@@ -19,9 +19,15 @@ namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
class Meta;
}
class MemVectors {
public:
explicit MemVectors(size_t dimension_, const std::string& file_location_);
explicit MemVectors(const std::string& group_id,
size_t dimension,
const std::string& file_location);
void add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
......@@ -29,7 +35,7 @@ public:
size_t approximate_size() const;
void serialize();
Status serialize(std::string& group_id);
~MemVectors();
......@@ -40,6 +46,7 @@ private:
MemVectors(const MemVectors&) = delete;
MemVectors& operator=(const MemVectors&) = delete;
std::string group_id_;
const std::string _file_location;
IDGenerator* _pIdGenerator;
size_t _dimension;
......@@ -49,12 +56,11 @@ private:
}; // MemVectors
class Meta;
typedef std::shared_ptr<MemVectors> VectorsPtr;
class MemManager {
public:
MemManager(const std::shared_ptr<Meta>& meta_)
MemManager(const std::shared_ptr<meta::Meta>& meta_)
: _pMeta(meta_) /*_last_compact_time(std::time(nullptr))*/ {}
VectorsPtr get_mem_by_group(const std::string& group_id_);
......@@ -62,7 +68,7 @@ public:
Status add_vectors(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status serialize();
Status serialize(std::vector<std::string>& group_ids);
private:
Status add_vectors_no_lock(const std::string& group_id_,
......@@ -73,7 +79,7 @@ private:
typedef std::vector<VectorsPtr> ImmMemPool;
MemMap _memMap;
ImmMemPool _immMems;
std::shared_ptr<Meta> _pMeta;
std::shared_ptr<meta::Meta> _pMeta;
/* std::time_t _last_compact_time; */
std::mutex _mutex;
}; // MemManager
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册