提交 44f5d3e7 编写于 作者: X Xu Peng

feat(db): meta operation


Former-commit-id: 6f7cd4ec069195d659585b321fc5f6e6f20bb719
上级 6e1e2828
...@@ -104,7 +104,9 @@ void DBImpl::background_call() { ...@@ -104,7 +104,9 @@ void DBImpl::background_call() {
Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
const meta::GroupFilesSchema& files) { const meta::GroupFilesSchema& files) {
meta::GroupFileSchema group_file; meta::GroupFileSchema group_file;
Status status = _pMeta->add_group_file(group_id, date, group_file); group_file.group_id = group_id;
group_file.date = date;
Status status = _pMeta->add_group_file(group_file);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
...@@ -161,8 +163,9 @@ Status DBImpl::background_merge_files(const std::string& group_id) { ...@@ -161,8 +163,9 @@ Status DBImpl::background_merge_files(const std::string& group_id) {
Status DBImpl::build_index(const meta::GroupFileSchema& file) { Status DBImpl::build_index(const meta::GroupFileSchema& file) {
meta::GroupFileSchema group_file; meta::GroupFileSchema group_file;
Status status = _pMeta->add_group_file(file.group_id, file.date, group_file.group_id = file.group_id;
group_file, meta::GroupFileSchema::INDEX); group_file.date = file.date;
Status status = _pMeta->add_group_file(group_file);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
...@@ -176,16 +179,14 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { ...@@ -176,16 +179,14 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) {
dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(), dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
from_index->id_map.data()); from_index->id_map.data());
/* std::cout << "raw size=" << from_index->ntotal << " index size=" << index->ntotal << std::endl; */ /* std::cout << "raw size=" << from_index->ntotal << " index size=" << index->ntotal << std::endl; */
// PXU TODO: Remove
/* auto location = group_file.location + ".index"; */
write_index(index, group_file.location.c_str()); write_index(index, group_file.location.c_str());
group_file.file_type = meta::GroupFileSchema::INDEX; group_file.file_type = meta::GroupFileSchema::INDEX;
/* auto to_remove = file; */ auto to_remove = file;
/* to_remove.file_type = TO_DELETE; */ to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
/* GroupFilesSchema update_files = {to_remove, group_file}; */ meta::GroupFilesSchema update_files = {to_remove, group_file};
/* _pMeta->update_files(update_files); */ _pMeta->update_files(update_files);
return Status::OK(); return Status::OK();
} }
......
...@@ -17,11 +17,19 @@ using namespace sqlite_orm; ...@@ -17,11 +17,19 @@ using namespace sqlite_orm;
inline auto StoragePrototype(const std::string& path) { inline auto StoragePrototype(const std::string& path) {
return make_storage(path, return make_storage(path,
make_table("Groups", make_table("Group",
make_column("id", &GroupSchema::id, primary_key()), make_column("id", &GroupSchema::id, primary_key()),
make_column("group_id", &GroupSchema::group_id, unique()), make_column("group_id", &GroupSchema::group_id, unique()),
make_column("dimension", &GroupSchema::dimension), make_column("dimension", &GroupSchema::dimension),
make_column("files_cnt", &GroupSchema::files_cnt, default_value(0)))); make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
make_table("GroupFile",
make_column("id", &GroupFileSchema::id, primary_key()),
make_column("group_id", &GroupFileSchema::group_id),
make_column("file_id", &GroupFileSchema::file_id),
make_column("file_type", &GroupFileSchema::file_type),
make_column("rows", &GroupFileSchema::rows, default_value(0)),
make_column("date", &GroupFileSchema::date))
);
} }
...@@ -35,6 +43,26 @@ long GetFileSize(const std::string& filename) ...@@ -35,6 +43,26 @@ long GetFileSize(const std::string& filename)
return rc == 0 ? stat_buf.st_size : -1; return rc == 0 ? stat_buf.st_size : -1;
} }
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
return _options.path + "/" + group_id;
}
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date;
return ss.str();
}
void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
if (group_file.date == EmptyDate) {
group_file.date = Meta::GetDate();
}
std::stringstream ss;
ss << GetGroupDatePartitionPath(group_file.group_id, group_file.date)
<< "/" << group_file.file_id;
group_file.location = ss.str();
}
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_) DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
: _options(options_) { : _options(options_) {
initialize(); initialize();
...@@ -103,74 +131,105 @@ Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { ...@@ -103,74 +131,105 @@ Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
return Status::OK(); return Status::OK();
} }
Status DBMetaImpl::add_group_file(const std::string& group_id, Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
GroupFileSchema& group_file_info, if (group_file.date == EmptyDate) {
GroupFileSchema::FILE_TYPE file_type) { group_file.date = Meta::GetDate();
return add_group_file(group_id, Meta::GetDate(), group_file_info); }
} GroupSchema group_info;
group_info.group_id = group_file.group_id;
auto status = get_group(group_info);
if (!status.ok()) {
return status;
}
Status DBMetaImpl::add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info,
GroupFileSchema::FILE_TYPE file_type) {
//PXU TODO
std::stringstream ss;
SimpleIDGenerator g; SimpleIDGenerator g;
std::string suffix = (file_type == GroupFileSchema::RAW) ? ".raw" : ".index"; group_file.file_type = GroupFileSchema::NEW;
/* ss << "/tmp/test/" << date */ group_file.file_id = g.getNextIDNumber();
ss << _options.path << "/" << date group_file.dimension = group_info.dimension;
<< "/" << g.getNextIDNumber() GetGroupFilePath(group_file);
<< suffix; try {
group_file_info.group_id = "1"; auto id = ConnectorPtr->insert(group_file);
group_file_info.dimension = 64; std::cout << __func__ << " id=" << id << std::endl;
group_file_info.location = ss.str(); group_info.id = id;
group_file_info.date = date; } catch(std::system_error& e) {
return Status::GroupError("Add GroupFile Group "
+ group_info.group_id + " File " + group_file.file_id + " Error");
}
return Status::OK(); return Status::OK();
} }
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
// PXU TODO
files.clear(); files.clear();
std::stringstream ss;
/* ss << "/tmp/test/" << Meta::GetDate(); */ auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
ss << _options.path << "/" << Meta::GetDate(); &GroupFileSchema::group_id,
boost::filesystem::path path(ss.str().c_str()); &GroupFileSchema::file_id,
boost::filesystem::directory_iterator end_itr; &GroupFileSchema::file_type,
for (boost::filesystem::directory_iterator itr(path); itr != end_itr; ++itr) { &GroupFileSchema::rows,
/* std::cout << itr->path().string() << std::endl; */ &GroupFileSchema::date),
GroupFileSchema f; where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));
f.location = itr->path().string();
std::string suffixStr = f.location.substr(f.location.find_last_of('.') + 1); std::map<std::string, GroupSchema> groups;
if (suffixStr == "index") continue;
if (1024*1024*1000 >= GetFileSize(f.location)) continue; for (auto& file : selected) {
std::cout << "[About to index] " << f.location << std::endl; GroupFileSchema group_file;
f.date = Meta::GetDate(); group_file.id = std::get<0>(file);
files.push_back(f); group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.date = std::get<5>(file);
auto groupItr = groups.find(group_file.group_id);
if (groupItr == groups.end()) {
GroupSchema group_info;
group_info.group_id = group_file.group_id;
auto status = get_group(group_info);
if (!status.ok()) {
return status;
}
groups[group_file.group_id] = group_info;
}
group_file.dimension = groups[group_file.group_id].dimension;
files.push_back(group_file);
} }
return Status::OK(); return Status::OK();
} }
Status DBMetaImpl::files_to_merge(const std::string& group_id, Status DBMetaImpl::files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) { DatePartionedGroupFilesSchema& files) {
//PXU TODO
files.clear(); files.clear();
std::stringstream ss;
/* ss << "/tmp/test/" << Meta::GetDate(); */ auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
ss << _options.path << "/" << Meta::GetDate(); &GroupFileSchema::group_id,
boost::filesystem::path path(ss.str().c_str()); &GroupFileSchema::file_id,
boost::filesystem::directory_iterator end_itr; &GroupFileSchema::file_type,
GroupFilesSchema gfiles; &GroupFileSchema::rows,
DateT date = Meta::GetDate(); &GroupFileSchema::date));
files[date] = gfiles; /* where(is_equal(&GroupFileSchema::file_type, GroupFileSchema::RAW) && */
for (boost::filesystem::directory_iterator itr(path); itr != end_itr; ++itr) { /* is_equal(&GroupFileSchema::group_id, group_id))); */
/* std::cout << itr->path().string() << std::endl; */
GroupFileSchema f; GroupSchema group_info;
f.location = itr->path().string(); group_info.group_id = group_id;
std::string suffixStr = f.location.substr(f.location.find_last_of('.') + 1); auto status = get_group(group_info);
if (suffixStr == "index") continue; if (!status.ok()) {
if (1024*1024*1000 < GetFileSize(f.location)) continue; return status;
std::cout << "About to merge " << f.location << std::endl; }
files[date].push_back(f);
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.date = std::get<5>(file);
group_file.dimension = group_info.dimension;
auto dateItr = files.find(group_file.date);
if (dateItr == files.end()) {
files[group_file.date] = GroupFilesSchema();
}
files[group_file.date].push_back(group_file);
} }
return Status::OK(); return Status::OK();
...@@ -203,7 +262,15 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) { ...@@ -203,7 +262,15 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) {
} }
Status DBMetaImpl::update_files(const GroupFilesSchema& files) { Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
//PXU TODO auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
ConnectorPtr->update(file);
}
return true;
});
if (!commited) {
return Status::DBTransactionError("Update files Error");
}
return Status::OK(); return Status::OK();
} }
......
...@@ -19,14 +19,7 @@ public: ...@@ -19,14 +19,7 @@ public:
virtual Status get_group(GroupSchema& group_info_) override; virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(const std::string& group_id, virtual Status add_group_file(GroupFileSchema& group_file_info) override;
DateT date,
GroupFileSchema& group_file_info,
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) override;
virtual Status add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_,
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) override;
virtual Status has_group_file(const std::string& group_id_, virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_, const std::string& file_id_,
...@@ -49,6 +42,9 @@ public: ...@@ -49,6 +42,9 @@ public:
private: private:
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
void GetGroupFilePath(GroupFileSchema& group_file);
Status initialize(); Status initialize();
const DBMetaOptions _options; const DBMetaOptions _options;
......
...@@ -133,26 +133,17 @@ Status LocalMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { ...@@ -133,26 +133,17 @@ Status LocalMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
return Status::OK(); return Status::OK();
} }
Status LocalMetaImpl::add_group_file(const std::string& group_id, Status LocalMetaImpl::add_group_file(GroupFileSchema& group_file_info) {
GroupFileSchema& group_file_info,
GroupFileSchema::FILE_TYPE file_type) {
return add_group_file(group_id, Meta::GetDate(), group_file_info);
}
Status LocalMetaImpl::add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info,
GroupFileSchema::FILE_TYPE file_type) {
GroupSchema group_info; GroupSchema group_info;
auto status = get_group(group_info); /* auto status = get_group(group_info); */
if (!status.ok()) { /* if (!status.ok()) { */
return status; /* return status; */
} /* } */
auto location = GetNextGroupFileLocationByPartition(group_id, date, file_type); /* auto location = GetNextGroupFileLocationByPartition(group_id, date, file_type); */
group_file_info.group_id = group_id; /* group_file_info.group_id = group_id; */
group_file_info.dimension = group_info.dimension; /* group_file_info.dimension = group_info.dimension; */
group_file_info.location = location; /* group_file_info.location = location; */
group_file_info.date = date; /* group_file_info.date = date; */
return Status::OK(); return Status::OK();
} }
......
...@@ -18,14 +18,7 @@ public: ...@@ -18,14 +18,7 @@ public:
virtual Status get_group(GroupSchema& group_info_) override; virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(const std::string& group_id, virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
DateT date,
GroupFileSchema& group_file_info,
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) override;
virtual Status add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_,
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) override;
virtual Status has_group_file(const std::string& group_id_, virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_, const std::string& file_id_,
......
...@@ -72,7 +72,8 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) { ...@@ -72,7 +72,8 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) {
} }
meta::GroupFileSchema group_file; meta::GroupFileSchema group_file;
auto status = _pMeta->add_group_file(group_id, group_file); group_file.group_id = group_id;
auto status = _pMeta->add_group_file(group_file);
if (!status.ok()) { if (!status.ok()) {
return nullptr; return nullptr;
} }
......
...@@ -14,6 +14,7 @@ namespace engine { ...@@ -14,6 +14,7 @@ namespace engine {
namespace meta { namespace meta {
typedef int DateT; typedef int DateT;
const DateT EmptyDate = -1;
struct GroupSchema { struct GroupSchema {
size_t id; size_t id;
...@@ -28,6 +29,7 @@ struct GroupFileSchema { ...@@ -28,6 +29,7 @@ struct GroupFileSchema {
typedef enum { typedef enum {
NEW, NEW,
RAW, RAW,
TO_INDEX,
INDEX, INDEX,
TO_DELETE, TO_DELETE,
} FILE_TYPE; } FILE_TYPE;
...@@ -37,7 +39,7 @@ struct GroupFileSchema { ...@@ -37,7 +39,7 @@ struct GroupFileSchema {
std::string file_id; std::string file_id;
int file_type = NEW; int file_type = NEW;
size_t rows; size_t rows;
DateT date; DateT date = EmptyDate;
uint16_t dimension; uint16_t dimension;
std::string location = ""; std::string location = "";
}; // GroupFileSchema }; // GroupFileSchema
...@@ -52,13 +54,7 @@ public: ...@@ -52,13 +54,7 @@ public:
virtual Status get_group(GroupSchema& group_info) = 0; virtual Status get_group(GroupSchema& group_info) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status add_group_file(const std::string& group_id_, virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
GroupFileSchema& group_file_info_,
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) = 0;
virtual Status add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info,
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) = 0;
virtual Status has_group_file(const std::string& group_id_, virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_, const std::string& file_id_,
......
...@@ -28,6 +28,9 @@ public: ...@@ -28,6 +28,9 @@ public:
static Status GroupError(const std::string& msg, const std::string& msg2="") { static Status GroupError(const std::string& msg, const std::string& msg2="") {
return Status(kGroupError, msg, msg2); return Status(kGroupError, msg, msg2);
} }
static Status DBTransactionError(const std::string& msg, const std::string& msg2="") {
return Status(kDBTransactionError, msg, msg2);
}
bool ok() const { return state_ == nullptr; } bool ok() const { return state_ == nullptr; }
...@@ -35,6 +38,7 @@ public: ...@@ -35,6 +38,7 @@ public:
bool IsInvalidDBPath() const { return code() == kInvalidDBPath; } bool IsInvalidDBPath() const { return code() == kInvalidDBPath; }
bool IsGroupError() const { return code() == kGroupError; } bool IsGroupError() const { return code() == kGroupError; }
bool IsDBTransactionError() const { return code() == kDBTransactionError; }
std::string ToString() const; std::string ToString() const;
...@@ -47,6 +51,7 @@ private: ...@@ -47,6 +51,7 @@ private:
kInvalidDBPath, kInvalidDBPath,
kGroupError, kGroupError,
kDBTransactionError,
}; };
Code code() const { Code code() const {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册