提交 4a17f744 编写于 作者: J jinhai

Merge branch 'impl_simple_delete' into 'branch-1.2'

MS-5 Implement Auto Archive Feature

See merge request megasearch/vecwise_engine!33

Former-commit-id: 03823135b54a184364789fc8e8373d489c99cdbb
......@@ -10,7 +10,9 @@ Please mark all change in change log and use the ticket from JIRA.
### New Feature
- MS-5 - Implement Auto Archive Feature
### Task
- MS-1 - Add CHANGELOG.md
- MS-4 - Refactor the vecwise_engine code structure
\ No newline at end of file
- MS-4 - Refactor the vecwise_engine code structure
......@@ -23,6 +23,8 @@ public:
virtual Status add_group(meta::GroupSchema& group_info_) = 0;
virtual Status get_group(meta::GroupSchema& group_info_) = 0;
virtual Status delete_vectors(const std::string& group_id,
const meta::DatesT& dates) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
......@@ -37,6 +39,8 @@ public:
virtual Status search(const std::string& group_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0;
virtual Status size(long& result) = 0;
virtual Status drop_all() = 0;
virtual Status count(const std::string& group_id, long& result) = 0;
......
......@@ -44,6 +44,12 @@ Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
return _pMeta->get_group(group_info);
}
template<typename EngineT>
Status DBImpl<EngineT>::delete_vectors(const std::string& group_id,
const meta::DatesT& dates) {
return _pMeta->delete_group_partitions(group_id, dates);
}
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
return _pMeta->has_group(group_id_, has_or_not_);
......@@ -286,7 +292,7 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
} else {
group_file.file_type = meta::GroupFileSchema::RAW;
}
group_file.rows = index_size;
group_file.size = index_size;
updated.push_back(group_file);
status = _pMeta->update_files(updated);
LOG(DEBUG) << "New merged file " << group_file.file_id <<
......@@ -320,6 +326,8 @@ Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
merge_files(group_id, kv.first, kv.second);
}
_pMeta->archive_files();
try_build_index();
_pMeta->cleanup_ttl_files(1);
......@@ -343,7 +351,7 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
auto index = to_index.BuildIndex(group_file.location);
group_file.file_type = meta::GroupFileSchema::INDEX;
group_file.rows = index->Size();
group_file.size = index->Size();
auto to_remove = file;
to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
......@@ -356,6 +364,7 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
<< " from file " << to_remove.file_id;
index->Cache();
_pMeta->archive_files();
return Status::OK();
}
......@@ -416,6 +425,11 @@ Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
return _pMeta->count(group_id, result);
}
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
return _pMeta->size(result);
}
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
{
......
......@@ -35,6 +35,7 @@ public:
virtual Status add_group(meta::GroupSchema& group_info) override;
virtual Status get_group(meta::GroupSchema& group_info) override;
virtual Status delete_vectors(const std::string& group_id, const meta::DatesT& dates) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status get_group_files(const std::string& group_id_,
......@@ -54,6 +55,8 @@ public:
virtual Status count(const std::string& group_id, long& result) override;
virtual Status size(long& result) override;
virtual ~DBImpl();
private:
......
......@@ -12,8 +12,11 @@
#include <fstream>
#include <sqlite_orm.h>
#include <easylogging++.h>
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
#include "MetaConsts.h"
namespace zilliz {
namespace vecwise {
......@@ -28,39 +31,26 @@ inline auto StoragePrototype(const std::string& path) {
make_column("id", &GroupSchema::id, primary_key()),
make_column("group_id", &GroupSchema::group_id, unique()),
make_column("dimension", &GroupSchema::dimension),
make_column("created_on", &GroupSchema::created_on),
make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
make_table("GroupFile",
make_column("id", &GroupFileSchema::id, primary_key()),
make_column("group_id", &GroupFileSchema::group_id),
make_column("file_id", &GroupFileSchema::file_id),
make_column("file_type", &GroupFileSchema::file_type),
make_column("rows", &GroupFileSchema::rows, default_value(0)),
make_column("size", &GroupFileSchema::size, default_value(0)),
make_column("updated_time", &GroupFileSchema::updated_time),
make_column("created_on", &GroupFileSchema::created_on),
make_column("date", &GroupFileSchema::date))
);
}
using ConnectorT = decltype(StoragePrototype("/tmp/dummy.sqlite3"));
using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
long GetFileSize(const std::string& filename)
{
struct stat stat_buf;
int rc = stat(filename.c_str(), &stat_buf);
return rc == 0 ? stat_buf.st_size : -1;
}
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
return _options.path + "/" + group_id;
}
long DBMetaImpl::GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
return micros;
return _options.path + "/tables/" + group_id;
}
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
......@@ -79,6 +69,22 @@ void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
group_file.location = ss.str();
}
Status DBMetaImpl::NextGroupId(std::string& group_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.getNextIDNumber();
group_id = ss.str();
return Status::OK();
}
Status DBMetaImpl::NextFileId(std::string& file_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.getNextIDNumber();
file_id = ss.str();
return Status::OK();
}
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
: _options(options_) {
initialize();
......@@ -104,21 +110,56 @@ Status DBMetaImpl::initialize() {
return Status::OK();
}
// PXU TODO: Temp solution. Will fix later
Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) {
if (dates.size() == 0) {
return Status::OK();
}
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group(group_info);
if (!status.ok()) {
return status;
}
auto yesterday = GetDateWithDelta(-1);
for (auto& date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions with 2 days");
}
}
try {
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
),
where(
c(&GroupFileSchema::group_id) == group_id and
in(&GroupFileSchema::date, dates)
));
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
return Status::OK();
}
Status DBMetaImpl::add_group(GroupSchema& group_info) {
if (group_info.group_id == "") {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.getNextIDNumber();
group_info.group_id = ss.str();
NextGroupId(group_info.group_id);
}
group_info.files_cnt = 0;
group_info.id = -1;
group_info.created_on = utils::GetMicroSecTimeStamp();
{
try {
auto id = ConnectorPtr->insert(group_info);
group_info.id = id;
/* LOG(DEBUG) << "Add group " << id; */
} catch (...) {
return Status::DBTransactionError("Add Group Error");
}
......@@ -127,7 +168,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
auto group_path = GetGroupPath(group_info.group_id);
if (!boost::filesystem::is_directory(group_path)) {
auto ret = boost::filesystem::create_directory(group_path);
auto ret = boost::filesystem::create_directories(group_path);
if (!ret) {
LOG(ERROR) << "Create directory " << group_path << " Error";
}
......@@ -192,21 +233,18 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
return status;
}
SimpleIDGenerator g;
std::stringstream ss;
ss << g.getNextIDNumber();
NextFileId(group_file.file_id);
group_file.file_type = GroupFileSchema::NEW;
group_file.file_id = ss.str();
group_file.dimension = group_info.dimension;
group_file.rows = 0;
group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front();
group_file.size = 0;
group_file.created_on = utils::GetMicroSecTimeStamp();
group_file.updated_time = group_file.created_on;
GetGroupFilePath(group_file);
{
try {
auto id = ConnectorPtr->insert(group_file);
group_file.id = id;
/* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */
} catch (...) {
return Status::DBTransactionError("Add file Error");
}
......@@ -233,7 +271,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));
......@@ -245,7 +283,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
auto groupItr = groups.find(group_file.group_id);
......@@ -281,7 +319,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::group_id) == group_id and
in(&GroupFileSchema::date, dates) and
......@@ -302,7 +340,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
group_file.dimension = group_info.dimension;
GetGroupFilePath(group_file);
......@@ -329,7 +367,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
c(&GroupFileSchema::group_id) == group_id));
......@@ -347,7 +385,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
group_file.dimension = group_info.dimension;
GetGroupFilePath(group_file);
......@@ -375,7 +413,32 @@ Status DBMetaImpl::has_group_file(const std::string& group_id_,
Status DBMetaImpl::get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) {
//PXU TODO
try {
auto files = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_id) == file_id_ and
c(&GroupFileSchema::group_id) == group_id_
));
assert(files.size() <= 1);
if (files.size() == 1) {
group_file_info_.id = std::get<0>(files[0]);
group_file_info_.group_id = std::get<1>(files[0]);
group_file_info_.file_id = std::get<2>(files[0]);
group_file_info_.file_type = std::get<3>(files[0]);
group_file_info_.size = std::get<4>(files[0]);
group_file_info_.date = std::get<5>(files[0]);
} else {
return Status::NotFound("GroupFile " + file_id_ + " not found");
}
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
return Status::OK();
}
......@@ -386,17 +449,117 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_,
return Status::OK();
}
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
auto& criterias = _options.archive_conf.GetCriterias();
if (criterias.size() == 0) {
return Status::OK();
}
for (auto kv : criterias) {
auto& criteria = kv.first;
auto& limit = kv.second;
if (criteria == "days") {
long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
try
{
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
),
where(
c(&GroupFileSchema::created_on) < (long)(now - usecs) and
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
));
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
}
if (criteria == "disk") {
long sum = 0;
size(sum);
// PXU TODO: refactor size
auto to_delete = (sum - limit*G);
discard_files_of_size(to_delete);
}
}
return Status::OK();
}
Status DBMetaImpl::size(long& result) {
result = 0;
try {
auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)),
where(
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
));
for (auto& sub_query : selected) {
if(!std::get<0>(sub_query)) {
continue;
}
result += (long)(*std::get<0>(sub_query));
}
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
return Status::OK();
}
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
if (to_discard_size <= 0) {
return Status::OK();
}
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::size),
where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
order_by(&GroupFileSchema::id),
limit(10));
std::vector<int> ids;
for (auto& file : selected) {
if (to_discard_size <= 0) break;
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.size = std::get<1>(file);
ids.push_back(group_file.id);
LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
to_discard_size -= group_file.size;
}
if (ids.size() == 0) {
return Status::OK();
}
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
),
where(
in(&GroupFileSchema::id, ids)
));
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
return discard_files_of_size(to_discard_size);
}
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
group_file.updated_time = GetMicroSecTimeStamp();
group_file.updated_time = utils::GetMicroSecTimeStamp();
try {
ConnectorPtr->update(group_file);
/* auto commited = ConnectorPtr->transaction([&] () mutable { */
/* ConnectorPtr->update(group_file); */
/* return true; */
/* }); */
/* if (!commited) { */
/* return Status::DBTransactionError("Update file Error"); */
/* } */
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
......@@ -409,7 +572,7 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) {
try {
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
file.updated_time = GetMicroSecTimeStamp();
file.updated_time = utils::GetMicroSecTimeStamp();
ConnectorPtr->update(file);
}
return true;
......@@ -425,16 +588,16 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) {
}
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
auto now = GetMicroSecTimeStamp();
auto now = utils::GetMicroSecTimeStamp();
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
c(&GroupFileSchema::updated_time) > now - 1000000*seconds));
c(&GroupFileSchema::updated_time) > now - seconds*US_PS));
GroupFilesSchema updated;
......@@ -444,7 +607,7 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
......@@ -467,7 +630,7 @@ Status DBMetaImpl::cleanup() {
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));
......@@ -480,7 +643,7 @@ Status DBMetaImpl::cleanup() {
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.rows = std::get<4>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
......@@ -500,7 +663,7 @@ Status DBMetaImpl::cleanup() {
Status DBMetaImpl::count(const std::string& group_id, long& result) {
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows,
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size,
&GroupFileSchema::date),
where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
......
......@@ -24,6 +24,8 @@ public:
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
virtual Status delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) override;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
......@@ -48,6 +50,10 @@ public:
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status archive_files() override;
virtual Status size(long& result) override;
virtual Status cleanup() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
......@@ -59,8 +65,9 @@ public:
virtual ~DBMetaImpl();
private:
long GetMicroSecTimeStamp();
Status NextFileId(std::string& file_id);
Status NextGroupId(std::string& group_id);
Status discard_files_of_size(long to_discard_size);
Status get_group_no_lock(GroupSchema& group_info);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <exception>
#include <string>
namespace zilliz {
namespace vecwise {
namespace engine {
class Exception : public std::exception {
public:
Exception(const std::string& message)
: message_(message) {
}
Exception()
: message_() {
}
virtual const char* what() const throw() {
if (message_.empty()) {
return "Default Exception.";
} else {
return message_.c_str();
}
}
virtual ~Exception() throw() {};
protected:
std::string message_;
};
class InvalidArgumentException : public Exception {
public:
InvalidArgumentException() : Exception("Invalid Argument"){};
InvalidArgumentException(const std::string& message) : Exception(message) {};
};
class OutOfRangeException : public Exception {
public:
OutOfRangeException() : Exception("Out Of Range"){};
OutOfRangeException(const std::string& message) : Exception(message) {};
};
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......@@ -47,12 +47,12 @@ size_t FaissExecutionEngine<IndexTrait>::Count() const {
template<class IndexTrait>
size_t FaissExecutionEngine<IndexTrait>::Size() const {
return (size_t)(Count() * pIndex_->d);
return (size_t)(Count() * pIndex_->d)*sizeof(float);
}
template<class IndexTrait>
size_t FaissExecutionEngine<IndexTrait>::PhysicalSize() const {
return (size_t)(Size()*sizeof(float));
return (size_t)(Count() * pIndex_->d)*sizeof(float);
}
template<class IndexTrait>
......
......@@ -241,6 +241,11 @@ Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
return Status::OK();
}
Status LocalMetaImpl::archive_files() {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::cleanup() {
//PXU TODO
return Status::OK();
......@@ -256,6 +261,11 @@ Status LocalMetaImpl::drop_all() {
return Status::OK();
}
Status LocalMetaImpl::size(long& result) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::count(const std::string& group_id, long& result) {
// PXU TODO
return Status::OK();
......
......@@ -22,7 +22,9 @@ public:
virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
/* virtual Status delete_group_partitions(const std::string& group_id, */
/* const meta::DatesT& dates) override; */
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
......@@ -45,12 +47,16 @@ public:
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status archive_files() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status drop_all() override;
virtual Status size(long& result) override;
private:
Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info);
......
......@@ -13,6 +13,7 @@
#include "MemManager.h"
#include "Meta.h"
#include "MetaConsts.h"
namespace zilliz {
......@@ -48,16 +49,16 @@ size_t MemVectors<EngineT>::approximate_size() const {
template<typename EngineT>
Status MemVectors<EngineT>::serialize(std::string& group_id) {
group_id = schema_.group_id;
auto rows = approximate_size();
auto size = approximate_size();
pEE_->Serialize();
schema_.rows = rows;
schema_.file_type = (rows >= options_.index_trigger_size) ?
schema_.size = size;
schema_.file_type = (size >= options_.index_trigger_size) ?
meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW;
auto status = pMeta_->update_group_file(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type == meta::GroupFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id << " of size " << pEE_->PhysicalSize() / (1024*1024) << " M";
<< " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M";
pEE_->Cache();
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <ctime>
#include <stdio.h>
#include "Meta.h"
namespace zilliz {
......@@ -11,13 +12,33 @@ namespace vecwise {
namespace engine {
namespace meta {
DateT Meta::GetDate(const std::time_t& t) {
tm *ltm = std::localtime(&t);
return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday;
DateT Meta::GetDate(const std::time_t& t, int day_delta) {
struct tm ltm;
localtime_r(&t, &ltm);
if (day_delta > 0) {
do {
++ltm.tm_mday;
--day_delta;
} while(day_delta > 0);
mktime(&ltm);
} else if (day_delta < 0) {
do {
--ltm.tm_mday;
++day_delta;
} while(day_delta < 0);
mktime(&ltm);
} else {
ltm.tm_mday;
}
return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday;
}
DateT Meta::GetDateWithDelta(int day_delta) {
return GetDate(std::time(nullptr), day_delta);
}
DateT Meta::GetDate() {
return GetDate(std::time(nullptr));
return GetDate(std::time(nullptr), 0);
}
} // namespace meta
......
......@@ -4,14 +4,11 @@
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <cstddef>
#include <vector>
#include <map>
#include <ctime>
#include <memory>
#include "MetaTypes.h"
#include "Options.h"
#include "Status.h"
......@@ -20,44 +17,7 @@ namespace vecwise {
namespace engine {
namespace meta {
typedef int DateT;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
struct GroupSchema {
size_t id;
std::string group_id;
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
}; // GroupSchema
struct GroupFileSchema {
typedef enum {
NEW,
RAW,
TO_INDEX,
INDEX,
TO_DELETE,
} FILE_TYPE;
size_t id;
std::string group_id;
std::string file_id;
int file_type = NEW;
size_t rows;
DateT date = EmptyDate;
uint16_t dimension;
std::string location = "";
long updated_time;
}; // GroupFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
class Meta;
class Meta {
public:
typedef std::shared_ptr<Meta> Ptr;
......@@ -67,6 +27,8 @@ public:
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
virtual Status delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) = 0;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
......@@ -89,6 +51,10 @@ public:
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) = 0;
virtual Status size(long& result) = 0;
virtual Status archive_files() = 0;
virtual Status files_to_index(GroupFilesSchema&) = 0;
virtual Status cleanup() = 0;
......@@ -98,8 +64,9 @@ public:
virtual Status count(const std::string& group_id, long& result) = 0;
static DateT GetDate(const std::time_t& t);
static DateT GetDate(const std::time_t& t, int day_delta = 0);
static DateT GetDate();
static DateT GetDateWithDelta(int day_delta);
}; // MetaData
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
const size_t K = 1024UL;
const size_t M = K*K;
const size_t G = K*M;
const size_t T = K*G;
const size_t S_PS = 1UL;
const size_t MS_PS = 1000*S_PS;
const size_t US_PS = 1000*MS_PS;
const size_t NS_PS = 1000*US_PS;
const size_t SECOND = 1UL;
const size_t M_SEC = 60*SECOND;
const size_t H_SEC = 60*M_SEC;
const size_t D_SEC = 24*H_SEC;
const size_t W_SEC = 7*D_SEC;
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include <map>
#include <string>
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
typedef int DateT;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
struct GroupSchema {
size_t id;
std::string group_id;
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
long created_on;
}; // GroupSchema
struct GroupFileSchema {
typedef enum {
NEW,
RAW,
TO_INDEX,
INDEX,
TO_DELETE,
} FILE_TYPE;
size_t id;
std::string group_id;
std::string file_id;
int file_type = NEW;
size_t size;
DateT date = EmptyDate;
uint16_t dimension;
std::string location = "";
long updated_time;
long created_on;
}; // GroupFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......@@ -3,9 +3,15 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <stdlib.h>
#include <assert.h>
#include <easylogging++.h>
#include <boost/algorithm/string.hpp>
#include "Options.h"
#include "Env.h"
#include "DBMetaImpl.h"
#include "Exception.h"
namespace zilliz {
namespace vecwise {
......@@ -15,10 +21,54 @@ Options::Options()
: env(Env::Default()) {
}
/* DBMetaOptions::DBMetaOptions(const std::string& dbpath, */
/* const std::string& uri) */
/* : path(dbpath), backend_uri(uri) { */
/* } */
ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) {
ParseType(type);
ParseCritirias(criterias);
}
void ArchiveConf::ParseCritirias(const std::string& criterias) {
std::stringstream ss(criterias);
std::vector<std::string> tokens;
boost::algorithm::split(tokens, criterias, boost::is_any_of(";"));
if (tokens.size() == 0) {
return;
}
for (auto& token : tokens) {
std::vector<std::string> kv;
boost::algorithm::split(kv, token, boost::is_any_of(":"));
if (kv.size() != 2) {
LOG(WARNING) << "Invalid ArchiveConf Criterias: " << token << " Ignore!";
continue;
}
if (kv[0] != "disk" && kv[0] != "days") {
LOG(WARNING) << "Invalid ArchiveConf Criterias: " << token << " Ignore!";
continue;
}
try {
auto value = std::stoi(kv[1]);
criterias_[kv[0]] = value;
}
catch (std::out_of_range&){
LOG(ERROR) << "Out of range: '" << kv[1] << "'";
throw OutOfRangeException();
}
catch (...){
LOG(ERROR) << "Invalid argument: '" << kv[1] << "'";
throw InvalidArgumentException();
}
}
}
void ArchiveConf::ParseType(const std::string& type) {
if (type != "delete" && type != "swap") {
LOG(ERROR) << "Invalid argument: type='" << type << "'";
throw InvalidArgumentException();
}
type_ = type;
}
} // namespace engine
} // namespace vecwise
......
......@@ -7,6 +7,7 @@
#include <string>
#include <memory>
#include <map>
namespace zilliz {
namespace vecwise {
......@@ -14,10 +15,26 @@ namespace engine {
class Env;
struct ArchiveConf {
using CriteriaT = std::map<std::string, int>;
ArchiveConf(const std::string& type, const std::string& criterias = "disk:512");
const std::string& GetType() const { return type_; }
const CriteriaT GetCriterias() const { return criterias_; }
private:
void ParseCritirias(const std::string& type);
void ParseType(const std::string& criterias);
std::string type_;
CriteriaT criterias_;
};
struct DBMetaOptions {
/* DBMetaOptions(const std::string&, const std::string&); */
std::string path;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
}; // DBMetaOptions
......@@ -25,7 +42,7 @@ struct Options {
Options();
uint16_t memory_sync_interval = 1;
uint16_t merge_trigger_number = 2;
size_t index_trigger_size = 1024*1024*256;
size_t index_trigger_size = 1024*1024*1024;
Env* env;
DBMetaOptions meta;
}; // Options
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <chrono>
#include "Utils.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace utils {
long GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
return micros;
}
} // namespace utils
} // namespace engine
} // namespace vecwise
} // namespace zilliz
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
namespace zilliz {
namespace vecwise {
namespace engine {
namespace utils {
long GetMicroSecTimeStamp();
} // namespace utils
} // namespace engine
} // namespace vecwise
} // namespace zilliz
......@@ -9,9 +9,103 @@
#include "utils.h"
#include "db/DB.h"
#include "db/DBImpl.h"
#include "db/MetaConsts.h"
using namespace zilliz::vecwise;
TEST_F(DBTest, CONFIG_TEST) {
{
ASSERT_ANY_THROW(engine::ArchiveConf conf("wrong"));
/* EXPECT_DEATH(engine::ArchiveConf conf("wrong"), ""); */
}
{
engine::ArchiveConf conf("delete");
ASSERT_EQ(conf.GetType(), "delete");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 1);
ASSERT_TRUE(criterias["disk"] == 512);
}
{
engine::ArchiveConf conf("swap");
ASSERT_EQ(conf.GetType(), "swap");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 1);
ASSERT_TRUE(criterias["disk"] == 512);
}
{
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "disk:"));
ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "disk:a"));
engine::ArchiveConf conf("swap", "disk:1024");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 1);
ASSERT_TRUE(criterias["disk"] == 1024);
}
{
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "days:"));
ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "days:a"));
engine::ArchiveConf conf("swap", "days:100");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 1);
ASSERT_TRUE(criterias["days"] == 100);
}
{
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "days:"));
ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "days:a"));
engine::ArchiveConf conf("swap", "days:100;disk:200");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 2);
ASSERT_TRUE(criterias["days"] == 100);
ASSERT_TRUE(criterias["disk"] == 200);
}
}
TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
long size;
engine::meta::GroupSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
db_->size(size);
int d = 256;
int nb = 20;
float *xb = new float[d * nb];
for(int i = 0; i < nb; i++) {
for(int j = 0; j < d; j++) xb[d * i + j] = drand48();
xb[d * i] += i / 2000.;
}
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->add_vectors(group_name, nb, xb, vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_TRUE(size < 1 * engine::meta::G);
delete [] xb;
};
TEST_F(DBTest, DB_TEST) {
static const std::string group_name = "test_group";
......@@ -63,7 +157,7 @@ TEST_F(DBTest, DB_TEST) {
START_TIMER;
stat = db_->search(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M";
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
......
......@@ -6,10 +6,14 @@
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include <stdlib.h>
#include <time.h>
#include "utils.h"
#include "db/DBMetaImpl.h"
#include "db/Factories.h"
#include "db/Utils.h"
#include "db/MetaConsts.h"
using namespace zilliz::vecwise::engine;
......@@ -59,10 +63,124 @@ TEST_F(MetaTest, GROUP_FILE_TEST) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.file_type, new_file_type);
/* group_file.file_type = meta::GroupFileSchema::NEW; */
/* status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file); */
/* ASSERT_TRUE(status.ok()); */
/* ASSERT_EQ(group_file.file_type, new_file_type); */
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
status = impl_->delete_group_partitions(group_file.group_id, dates);
ASSERT_FALSE(status.ok());
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
}
status = impl_->delete_group_partitions(group_file.group_id, dates);
ASSERT_TRUE(status.ok());
group_file.date = meta::Meta::GetDateWithDelta(-2);
status = impl_->update_group_file(group_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2));
ASSERT_FALSE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
dates.clear();
dates.push_back(group_file.date);
status = impl_->delete_group_partitions(group_file.group_id, dates);
ASSERT_TRUE(status.ok());
status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
}
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
srand(time(0));
DBMetaOptions options;
options.path = "/tmp/vecwise_test";
int days_num = rand() % 100;
std::stringstream ss;
ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options);
auto group_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl.add_group(group);
meta::GroupFilesSchema files;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
auto cnt = 100;
long ts = utils::GetMicroSecTimeStamp();
std::vector<int> days;
for (auto i=0; i<cnt; ++i) {
status = impl.add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
int day = rand() % (days_num*2);
group_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
status = impl.update_group_file(group_file);
files.push_back(group_file);
days.push_back(day);
}
impl.archive_files();
int i = 0;
for (auto file : files) {
status = impl.get_group_file(file.group_id, file.file_id, file);
ASSERT_TRUE(status.ok());
if (days[i] < days_num) {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
} else {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::TO_DELETE);
}
i++;
}
impl.drop_all();
}
TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
DBMetaOptions options;
options.path = "/tmp/vecwise_test";
options.archive_conf = ArchiveConf("delete", "disk:11");
auto impl = meta::DBMetaImpl(options);
auto group_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl.add_group(group);
meta::GroupFilesSchema files;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
auto cnt = 10;
auto each_size = 2UL;
for (auto i=0; i<cnt; ++i) {
status = impl.add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
group_file.size = each_size * meta::G;
status = impl.update_group_file(group_file);
files.push_back(group_file);
}
impl.archive_files();
int i = 0;
for (auto file : files) {
status = impl.get_group_file(file.group_id, file.file_id, file);
ASSERT_TRUE(status.ok());
if (i < 5) {
ASSERT_TRUE(file.file_type == meta::GroupFileSchema::TO_DELETE);
} else {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
}
++i;
}
impl.drop_all();
}
TEST_F(MetaTest, GROUP_FILES_TEST) {
......
......@@ -29,19 +29,30 @@ void DBTest::InitLog() {
el::Loggers::reconfigureLogger("default", defaultConf);
}
void DBTest::SetUp() {
InitLog();
engine::Options DBTest::GetOptions() {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/vecwise_test";
return options;
}
void DBTest::SetUp() {
InitLog();
auto options = GetOptions();
db_ = engine::DBFactory::Build(options, "Faiss,IDMap");
}
void DBTest::TearDown() {
delete db_;
auto options = engine::OptionsFactory::Build();
boost::filesystem::remove_all("/tmp/vecwise_test");
}
engine::Options DBTest2::GetOptions() {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/vecwise_test";
options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1");
return options;
}
void MetaTest::SetUp() {
InitLog();
impl_ = engine::DBMetaImplFactory::Build();
......
......@@ -39,6 +39,12 @@ protected:
void InitLog();
virtual void SetUp() override;
virtual void TearDown() override;
virtual zilliz::vecwise::engine::Options GetOptions();
};
class DBTest2 : public DBTest {
protected:
virtual zilliz::vecwise::engine::Options GetOptions() override;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册