提交 71d9c14b 编写于 作者: G groot

Merge branch 'jinhai' of 192.168.1.105:jinhai/vecwise_engine into jinhai


Former-commit-id: 1fb48f7b39f74b57aad58ef6c73b9ba8fd709410
......@@ -242,6 +242,8 @@ Status DBImpl::background_merge_files(const std::string& group_id) {
try_build_index();
}
_pMeta->cleanup_ttl_files(1);
return Status::OK();
}
......@@ -255,13 +257,18 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) {
}
auto opd = std::make_shared<Operand>();
opd->d = file.dimension;
opd->index_type = "IDMap,Flat";
IndexBuilderPtr pBuilder = GetIndexBuilder(opd);
auto from_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
std::cout << "Preparing build_index for file_id=" << file.file_id
<< " with new index_file_id=" << group_file.file_id << std::endl;
auto index = pBuilder->build_all(from_index->ntotal,
dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
from_index->id_map.data());
std::cout << "Ending build_index for file_id=" << file.file_id
<< " with new index_file_id=" << group_file.file_id << std::endl;
/* std::cout << "raw size=" << from_index->ntotal << " index size=" << index->ntotal << std::endl; */
write_index(index, group_file.location.c_str());
group_file.file_type = meta::GroupFileSchema::INDEX;
......
......@@ -3,6 +3,7 @@
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <sqlite_orm/sqlite_orm.h>
#include "DBMetaImpl.h"
......@@ -28,6 +29,7 @@ inline auto StoragePrototype(const std::string& path) {
make_column("file_id", &GroupFileSchema::file_id),
make_column("file_type", &GroupFileSchema::file_type),
make_column("rows", &GroupFileSchema::rows, default_value(0)),
make_column("updated_time", &GroupFileSchema::updated_time),
make_column("date", &GroupFileSchema::date))
);
......@@ -47,6 +49,14 @@ std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
return _options.path + "/" + group_id;
}
long DBMetaImpl::GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
return micros;
}
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date;
......@@ -166,6 +176,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
group_file.file_id = ss.str();
group_file.dimension = group_info.dimension;
group_file.rows = 0;
group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front();
GetGroupFilePath(group_file);
{
......@@ -329,7 +340,8 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_,
return Status::OK();
}
Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) {
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
group_file.updated_time = GetMicroSecTimeStamp();
auto commited = ConnectorPtr->transaction([&] () mutable {
ConnectorPtr->update(group_file);
return true;
......@@ -340,9 +352,10 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) {
return Status::OK();
}
Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
file.updated_time = GetMicroSecTimeStamp();
ConnectorPtr->update(file);
}
return true;
......@@ -353,6 +366,38 @@ Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
return Status::OK();
}
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
auto now = GetMicroSecTimeStamp();
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
c(&GroupFileSchema::updated_time) > now - 1000000*seconds));
GroupFilesSchema updated;
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
boost::filesystem::remove(group_file.location);
}
ConnectorPtr->remove<GroupFileSchema>(group_file.id);
std::cout << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl;
}
return Status::OK();
}
Status DBMetaImpl::cleanup() {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
......
......@@ -27,13 +27,13 @@ public:
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) override;
virtual Status update_group_file(const GroupFileSchema& group_file_) override;
virtual Status update_group_file(GroupFileSchema& group_file_) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status update_files(const GroupFilesSchema& files) override;
virtual Status update_files(GroupFilesSchema& files) override;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) override;
......@@ -46,10 +46,13 @@ public:
virtual Status cleanup() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual ~DBMetaImpl();
private:
long GetMicroSecTimeStamp();
Status get_group_no_lock(GroupSchema& group_info);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
......
......@@ -226,12 +226,12 @@ Status LocalMetaImpl::get_group_files(const std::string& group_id_,
return Status::OK();
}
Status LocalMetaImpl::update_group_file(const GroupFileSchema& group_file_) {
Status LocalMetaImpl::update_group_file(GroupFileSchema& group_file_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::update_files(const GroupFilesSchema& files) {
Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
//PXU TODO
return Status::OK();
}
......@@ -241,6 +241,11 @@ Status LocalMetaImpl::cleanup() {
return Status::OK();
}
Status LocalMetaImpl::cleanup_ttl_files(uint16_t seconds) {
// PXU TODO
return Status::OK();
}
} // namespace meta
} // namespace engine
} // namespace vecwise
......
......@@ -26,13 +26,13 @@ public:
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) override;
virtual Status update_group_file(const GroupFileSchema& group_file_) override;
virtual Status update_group_file(GroupFileSchema& group_file_) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status update_files(const GroupFilesSchema& files) override;
virtual Status update_files(GroupFilesSchema& files) override;
virtual Status cleanup() override;
......@@ -41,6 +41,8 @@ public:
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
private:
Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info);
......
......@@ -42,6 +42,7 @@ struct GroupFileSchema {
DateT date = EmptyDate;
uint16_t dimension;
std::string location = "";
long updated_time;
}; // GroupFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
......@@ -62,13 +63,13 @@ public:
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) = 0;
virtual Status update_group_file(const GroupFileSchema& group_file_) = 0;
virtual Status update_group_file(GroupFileSchema& group_file_) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) = 0;
virtual Status update_files(const GroupFilesSchema& files) = 0;
virtual Status update_files(GroupFilesSchema& files) = 0;
virtual Status files_to_search(const std::string& group_id,
std::vector<DateT> partition,
......@@ -80,6 +81,7 @@ public:
virtual Status files_to_index(GroupFilesSchema&) = 0;
virtual Status cleanup() = 0;
virtual Status cleanup_ttl_files(uint16_t) = 0;
static DateT GetDate(const std::time_t& t);
static DateT GetDate();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册