From a6e92dc9ab6882c4dce4155e6c5365759d4cf506 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Thu, 18 Apr 2019 21:00:31 +0800 Subject: [PATCH] feat(db): update for merege and index Former-commit-id: b06f36ce5a8ebca0f1f4554437b19233f8f6f723 --- cpp/src/db/DBImpl.cpp | 9 +++++++- cpp/src/db/DBMetaImpl.cpp | 41 +++++++++++++++++++++++++++++++++++- cpp/src/db/DBMetaImpl.h | 4 ++++ cpp/src/db/LocalMetaImpl.cpp | 5 +++++ cpp/src/db/LocalMetaImpl.h | 2 ++ cpp/src/db/Meta.h | 2 ++ cpp/src/db/Options.h | 2 +- 7 files changed, 62 insertions(+), 3 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index e2246c95..b92518d6 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -124,8 +124,15 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, updated.push_back(file_schema); } + auto index_size = group_file.dimension * index->ntotal; faiss::write_index(index.get(), group_file.location.c_str()); - group_file.file_type = meta::GroupFileSchema::RAW; + + std::cout << "Merged size=" << index_size << std::endl; + if (index_size >= _options.index_trigger_size) { + group_file.file_type = meta::GroupFileSchema::TO_INDEX; + } else { + group_file.file_type = meta::GroupFileSchema::RAW; + } updated.push_back(group_file); status = _pMeta->update_files(updated); diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index ed0382b1..7a0bef88 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -33,7 +33,7 @@ inline auto StoragePrototype(const std::string& path) { } -using ConnectorT = decltype(StoragePrototype("")); +using ConnectorT = decltype(StoragePrototype("/tmp/dummy.sqlite3")); static std::unique_ptr ConnectorPtr; long GetFileSize(const std::string& filename) @@ -77,6 +77,8 @@ Status DBMetaImpl::initialize() { ConnectorPtr->sync_schema(); + cleanup(); + return Status::OK(); } @@ -211,6 +213,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { group_file.file_type = std::get<3>(file); group_file.rows = std::get<4>(file); group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); auto groupItr = groups.find(group_file.group_id); if (groupItr == groups.end()) { GroupSchema group_info; @@ -316,6 +319,42 @@ Status DBMetaImpl::update_files(const GroupFilesSchema& files) { return Status::OK(); } +Status DBMetaImpl::cleanup() { + std::unique_lock lk(mutex_); + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or + c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW)); + + GroupFilesSchema updated; + + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); + if (group_file.file_type == GroupFileSchema::TO_DELETE) { + boost::filesystem::remove(group_file.location); + } + ConnectorPtr->remove(group_file.id); + std::cout << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; + } + + return Status::OK(); +} + +DBMetaImpl::~DBMetaImpl() { + cleanup(); +} + } // namespace meta } // namespace engine } // namespace vecwise diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 3011c700..5fa10e98 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -42,6 +42,10 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status cleanup() override; + + virtual ~DBMetaImpl(); + private: Status get_group_no_lock(GroupSchema& group_info); diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp index d39b2fee..7bdb5076 100644 --- a/cpp/src/db/LocalMetaImpl.cpp +++ b/cpp/src/db/LocalMetaImpl.cpp @@ -236,6 +236,11 @@ Status LocalMetaImpl::update_files(const GroupFilesSchema& files) { return Status::OK(); } +Status LocalMetaImpl::cleanup() { + //PXU TODO + return Status::OK(); +} + } // namespace meta } // namespace engine } // namespace vecwise diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h index bd64970c..f74916c0 100644 --- a/cpp/src/db/LocalMetaImpl.h +++ b/cpp/src/db/LocalMetaImpl.h @@ -34,6 +34,8 @@ public: virtual Status update_files(const GroupFilesSchema& files) override; + virtual Status cleanup() override; + virtual Status files_to_merge(const std::string& group_id, DatePartionedGroupFilesSchema& files) override; diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index 510cfc83..84ba4147 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -75,6 +75,8 @@ public: virtual Status files_to_index(GroupFilesSchema&) = 0; + virtual Status cleanup() = 0; + static DateT GetDate(const std::time_t& t); static DateT GetDate(); diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index fd64f20e..1af20e42 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -20,7 +20,7 @@ struct Options { Options(); uint16_t memory_sync_interval = 10; uint16_t merge_trigger_number = 100; - size_t index_trigger_size = 1024*1024*1024; + size_t index_trigger_size = 1024*1024*256; Env* env; DBMetaOptions meta; }; // Options -- GitLab