提交 1bc01da8 编写于 作者: X Xu Peng 提交者: xj.lin

feat(cpp/db): add timer thread


Former-commit-id: e94c2675e8e4294620c94669fa1d3208af7fcc40
上级 3e8213ff
#include <assert.h> #include <assert.h>
#include <chrono>
#include "db_impl.h" #include "db_impl.h"
namespace vecengine { namespace vecengine {
...@@ -9,8 +10,10 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_) ...@@ -9,8 +10,10 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_)
_options(options_), _options(options_),
_bg_work_finish_signal(_mutex), _bg_work_finish_signal(_mutex),
_bg_compaction_scheduled(false), _bg_compaction_scheduled(false),
_shutting_down(false),
_pMeta(new DBMetaImpl(*(_options.pMetaOptions))), _pMeta(new DBMetaImpl(*(_options.pMetaOptions))),
_pMemMgr(new MemManager(_pMeta)) { _pMemMgr(new MemManager(_pMeta)) {
start_timer_task(Options.memory_sync_interval);
} }
Status DBImpl::add_group(const GroupOptions& options_, Status DBImpl::add_group(const GroupOptions& options_,
...@@ -39,7 +42,27 @@ Status DBImpl::get_group_files(const std::string& group_id_, ...@@ -39,7 +42,27 @@ Status DBImpl::get_group_files(const std::string& group_id_,
Status DBImpl::add_vectors(const std::string& group_id_, Status DBImpl::add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) { size_t n, const float* vectors, IDNumbers& vector_ids_) {
return _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_); Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
if (!status.ok()) {
return status;
}
}
void DBImpl::start_timer_task(int interval_) {
std::thread bg_task(&DBImpl::background_timer_task, this, interval_);
bg_task.detach();
}
void DBImpl::background_timer_task(int interval_) {
Status status;
while (true) {
if (!_bg_error.ok()) break;
if (_shutting_down.load(std::memory_order_acquire)) break;
std::this_thread::sleep_for(std::chrono::seconds(interval_));
try_schedule_compaction();
}
} }
void DBImpl::try_schedule_compaction() { void DBImpl::try_schedule_compaction() {
...@@ -61,14 +84,21 @@ void DBImpl::background_call() { ...@@ -61,14 +84,21 @@ void DBImpl::background_call() {
if (!_bg_error.ok()) return; if (!_bg_error.ok()) return;
background_compaction(); background_compaction();
_bg_compaction_scheduled = false;
_bg_work_finish_signal.notify_all();
} }
void DBImpl::background_compaction() { void DBImpl::background_compaction() {
_pMemMgr->serialize();
} }
void DBImpl::compact_memory() { DBImpl::~DBImpl() {
std::lock_guard<std::mutex> _mutex;
_shutting_down.store(true, std::memory_order_release);
while (_bg_compaction_scheduled) {
_bg_work_finish_signal.wait();
}
} }
/* /*
......
...@@ -49,6 +49,7 @@ private: ...@@ -49,6 +49,7 @@ private:
std::condition_variable _bg_work_finish_signal; std::condition_variable _bg_work_finish_signal;
bool _bg_compaction_scheduled; bool _bg_compaction_scheduled;
Status _bg_error; Status _bg_error;
std::atomic<bool> _shutting_down;
std::shared_ptr<Meta> _pMeta; std::shared_ptr<Meta> _pMeta;
std::shared_ptr<MemManager> _pMemMgr; std::shared_ptr<MemManager> _pMemMgr;
......
...@@ -52,7 +52,7 @@ MemVectors::~MemVectors() { ...@@ -52,7 +52,7 @@ MemVectors::~MemVectors() {
* MemManager * MemManager
*/ */
MemVectors* MemManager::get_mem_by_group(const std::string& group_id_) { VectorsPtr MemManager::get_mem_by_group(const std::string& group_id_) {
auto memIt = _memMap.find(group_id_); auto memIt = _memMap.find(group_id_);
if memIt != _memMap.end() { if memIt != _memMap.end() {
return &(memIt->second); return &(memIt->second);
...@@ -63,15 +63,16 @@ MemVectors* MemManager::get_mem_by_group(const std::string& group_id_) { ...@@ -63,15 +63,16 @@ MemVectors* MemManager::get_mem_by_group(const std::string& group_id_) {
if (!status.ok()) { if (!status.ok()) {
return nullptr; return nullptr;
} }
_memMap[group_id] = MemVectors(group_info.dimension, group_info.next_file_location); _memMap[group_id] = std::shared_ptr<MemVectors>(new MemVectors(group_info.dimension,
return &(_memMap[group_id]); group_info.next_file_location));
return _memMap[group_id];
} }
Status MemManager::add_vectors(const std::string& group_id_, Status MemManager::add_vectors(const std::string& group_id_,
size_t n_, size_t n_,
const float* vectors_, const float* vectors_,
IDNumbers& vector_ids_) { IDNumbers& vector_ids_) {
// PXU TODO std::lock_guard<std::mutex> lock(_mutex);
return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_); return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_);
} }
...@@ -86,5 +87,35 @@ Status MemManager::add_vectors_no_lock(const std::string& group_id_, ...@@ -86,5 +87,35 @@ Status MemManager::add_vectors_no_lock(const std::string& group_id_,
return mem->add(n, vectors, vector_ids_); return mem->add(n, vectors, vector_ids_);
} }
Status MemManager::mark_memory_as_immutable() {
std::lock_guard<std::mutex> lock(_mutex);
for (auto& kv: _memMap) {
_immMems.push_back(kv.second);
}
_memMap.clear();
}
/* bool MemManager::need_serialize(double interval) { */
/* if (_immMems.size() > 0) { */
/* return false; */
/* } */
/* auto diff = std::difftime(std::time(nullptr), _last_compact_time); */
/* if (diff >= interval) { */
/* return true; */
/* } */
/* return false; */
/* } */
Status MemManager::serialize() {
mark_memory_as_immutable();
for (auto& mem : _immMems) {
mem->serialize()
}
_immMems.clear();
/* _last_compact_time = std::time(nullptr); */
}
} // namespace vecengine } // namespace vecengine
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <map> #include <map>
#include <string> #include <string>
#include <ctime>
#include "id_generators.h" #include "id_generators.h"
#include "status.h" #include "status.h"
...@@ -42,20 +43,28 @@ class Meta; ...@@ -42,20 +43,28 @@ class Meta;
class MemManager { class MemManager {
public: public:
MemManager(const std::shared_ptr<Meta>& meta_) : _pMeta(meta_) {} typedef std::shared_ptr<MemVectors> VectorsPtr;
MemManager(const std::shared_ptr<Meta>& meta_)
: _pMeta(meta_), _last_compact_time(std::time(nullptr)) {}
MemVectors* get_mem_by_group(const std::string& group_id_); VectorsPtr get_mem_by_group(const std::string& group_id_);
Status add_vectors(const std::string& group_id_, Status add_vectors(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_); size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status serialize();
private: private:
Status add_vectors_no_lock(const std::string& group_id_, Status add_vectors_no_lock(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_); size_t n_, const float* vectors_, IDNumbers& vector_ids_);
typedef std::map<std::string, MemVectors> MemMap; typedef std::map<std::string, VectorsPtr> MemMap;
typedef std::vector<VectorsPtr> ImmMemPool;
MemMap _memMap; MemMap _memMap;
ImmMemPool _immMems;
std::shared_ptr<Meta> _pMeta; std::shared_ptr<Meta> _pMeta;
std::time_t _last_compact_time;
std::mutex _mutex;
}; // MemManager }; // MemManager
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册