提交 48626e33 编写于 作者: Z zhiru 提交者: jinhai

Add new mem manager


Former-commit-id: 639e28832e5dfb1cdbdbc7a47177520a9ad4f60f
上级 f7e11e87
......@@ -18,6 +18,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-152 - Delete assert in MySQLMetaImpl and change MySQLConnectionPool impl
## New Feature
- MS-137 - Integrate knowhere
- MS-180 - Add new mem manager
## Task
......
......@@ -2,7 +2,7 @@ server_config:
address: 0.0.0.0
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
mode: single # milvus deployment type: single, cluster
mode: single # milvus deployment type: single, cluster, read_only
db_config:
db_path: @MILVUS_DB_PATH@ # milvus data storage path
......@@ -15,6 +15,8 @@ db_config:
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB
archive_days_threshold: 30 # files older than x days will be archived, unit: day
maximum_memory: 4 # maximum memory allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of maximum_memory and cpu_cache_capacity should be less than total memory
metric_config:
is_startup: off # if monitoring start: on, off
......
......@@ -11,6 +11,9 @@ namespace engine {
const size_t K = 1024UL;
const size_t M = K*K;
const size_t G = K*M;
const size_t T = K*G;
const size_t MAX_TABLE_FILE_MEM = 128 * M;
const int VECTOR_TYPE_SIZE = sizeof(float);
......
......@@ -8,6 +8,7 @@
#include "MetaConsts.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include "Log.h"
#include <iostream>
#include <sstream>
......@@ -128,6 +129,10 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
LOG(DEBUG) << "MemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() <<
", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem();
MemVectorsPtr mem = GetMemByTable(table_id);
if (mem == nullptr) {
return Status::NotFound("Group " + table_id + " not found!");
......@@ -192,6 +197,26 @@ Status MemManager::EraseMemVector(const std::string& table_id) {
return Status::OK();
}
size_t MemManager::GetCurrentMutableMem() {
size_t totalMem = 0;
for (auto& kv : mem_id_map_) {
auto memVector = kv.second;
totalMem += memVector->Size();
}
return totalMem;
}
size_t MemManager::GetCurrentImmutableMem() {
size_t totalMem = 0;
for (auto& memVector : immu_mem_list_) {
totalMem += memVector->Size();
}
return totalMem;
}
size_t MemManager::GetCurrentMem() {
return GetCurrentMutableMem() + GetCurrentImmutableMem();
}
} // namespace engine
} // namespace milvus
......
......@@ -78,6 +78,12 @@ public:
Status EraseMemVector(const std::string& table_id) override;
size_t GetCurrentMutableMem() override;
size_t GetCurrentImmutableMem() override;
size_t GetCurrentMem() override;
private:
MemVectorsPtr GetMemByTable(const std::string& table_id);
......
......@@ -16,6 +16,12 @@ public:
virtual Status EraseMemVector(const std::string& table_id) = 0;
virtual size_t GetCurrentMutableMem() = 0;
virtual size_t GetCurrentImmutableMem() = 0;
virtual size_t GetCurrentMem() = 0;
}; // MemManagerAbstract
using MemManagerAbstractPtr = std::shared_ptr<MemManagerAbstract>;
......
......@@ -49,13 +49,15 @@ size_t MemTable::GetTableFileCount() {
}
Status MemTable::Serialize() {
for (auto& memTableFile : mem_table_file_list_) {
auto status = memTableFile->Serialize();
for (auto memTableFile = mem_table_file_list_.begin(); memTableFile != mem_table_file_list_.end(); ) {
auto status = (*memTableFile)->Serialize();
if (!status.ok()) {
std::string errMsg = "MemTable::Serialize failed: " + status.ToString();
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
std::lock_guard<std::mutex> lock(mutex_);
memTableFile = mem_table_file_list_.erase(memTableFile);
}
return Status::OK();
}
......@@ -64,10 +66,19 @@ bool MemTable::Empty() {
return mem_table_file_list_.empty();
}
std::string MemTable::GetTableId() {
const std::string& MemTable::GetTableId() const {
return table_id_;
}
size_t MemTable::GetCurrentMem() {
std::lock_guard<std::mutex> lock(mutex_);
size_t totalMem = 0;
for (auto& memTableFile : mem_table_file_list_) {
totalMem += memTableFile->GetCurrentMem();
}
return totalMem;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
......@@ -4,7 +4,7 @@
#include "MemTableFile.h"
#include "VectorSource.h"
#include <stack>
#include <mutex>
namespace zilliz {
namespace milvus {
......@@ -30,7 +30,9 @@ public:
bool Empty();
std::string GetTableId();
const std::string& GetTableId() const;
size_t GetCurrentMem();
private:
const std::string table_id_;
......@@ -41,6 +43,8 @@ private:
Options options_;
std::mutex mutex_;
}; //MemTable
} // namespace engine
......
#include "NewMemManager.h"
#include "VectorSource.h"
#include "Log.h"
#include "Constants.h"
#include <thread>
namespace zilliz {
namespace milvus {
......@@ -20,6 +24,9 @@ Status NewMemManager::InsertVectors(const std::string& table_id_,
const float* vectors_,
IDNumbers& vector_ids_) {
while (GetCurrentMem() > options_.maximum_memory) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::unique_lock<std::mutex> lock(mutex_);
......@@ -30,6 +37,10 @@ Status NewMemManager::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
LOG(DEBUG) << "NewMemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() <<
", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem();
MemTablePtr mem = GetMemByTable(table_id);
VectorSource::Ptr source = std::make_shared<VectorSource>(n, vectors);
......@@ -64,6 +75,12 @@ Status NewMemManager::Serialize(std::set<std::string>& table_ids) {
table_ids.insert(mem->GetTableId());
}
immu_mem_list_.clear();
// for (auto mem = immu_mem_list_.begin(); mem != immu_mem_list_.end(); ) {
// (*mem)->Serialize();
// table_ids.insert((*mem)->GetTableId());
// mem = immu_mem_list_.erase(mem);
// LOG(DEBUG) << "immu_mem_list_ size = " << immu_mem_list_.size();
// }
return Status::OK();
}
......@@ -87,6 +104,27 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) {
return Status::OK();
}
size_t NewMemManager::GetCurrentMutableMem() {
size_t totalMem = 0;
for (auto& kv : mem_id_map_) {
auto memTable = kv.second;
totalMem += memTable->GetCurrentMem();
}
return totalMem;
}
size_t NewMemManager::GetCurrentImmutableMem() {
size_t totalMem = 0;
for (auto& memTable : immu_mem_list_) {
totalMem += memTable->GetCurrentMem();
}
return totalMem;
}
size_t NewMemManager::GetCurrentMem() {
return GetCurrentMutableMem() + GetCurrentImmutableMem();
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
......@@ -31,6 +31,12 @@ public:
Status EraseMemVector(const std::string& table_id) override;
size_t GetCurrentMutableMem() override;
size_t GetCurrentImmutableMem() override;
size_t GetCurrentMem() override;
private:
MemTablePtr GetMemByTable(const std::string& table_id);
......
......@@ -61,6 +61,7 @@ struct Options {
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
int mode = MODE::SINGLE;
float maximum_memory = 4 * ONE_GB;
}; // Options
......
......@@ -23,6 +23,14 @@ DBWrapper::DBWrapper() {
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
float maximum_memory = config.GetFloatValue(CONFIG_MAXMIMUM_MEMORY);
if (maximum_memory > 1.0) {
opt.maximum_memory = maximum_memory * engine::ONE_GB;
}
else {
std::cout << "ERROR: maximum_memory should be at least 1 GB" << std::endl;
kill(0, SIGUSR1);
}
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
......
......@@ -26,6 +26,7 @@ static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
static const std::string CONFIG_MAXMIMUM_MEMORY = "maximum_memory";
static const std::string CONFIG_LOG = "log_config";
......
......@@ -8,6 +8,8 @@
#include "db/Constants.h"
#include "db/EngineFactory.h"
#include "metrics/Metrics.h"
#include "db/MetaConsts.h"
#include "boost/filesystem.hpp"
#include <thread>
#include <fstream>
......@@ -34,9 +36,6 @@ namespace {
vectors.clear();
vectors.resize(n*TABLE_DIM);
float* data = vectors.data();
// std::random_device rd;
// std::mt19937 gen(rd());
// std::uniform_real_distribution<> dis(0.0, 1.0);
for(int i = 0; i < n; i++) {
for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48();
data[TABLE_DIM * i] += i / 2000.;
......@@ -44,7 +43,7 @@ namespace {
}
}
TEST(MEM_TEST, VECTOR_SOURCE_TEST) {
TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
......@@ -91,7 +90,7 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) {
ASSERT_TRUE(status.ok());
}
TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
......@@ -135,7 +134,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
ASSERT_TRUE(status.ok());
}
TEST(MEM_TEST, MEM_TABLE_TEST) {
TEST_F(NewMemManagerTest, MEM_TABLE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
......@@ -201,7 +200,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) {
ASSERT_TRUE(status.ok());
}
TEST(MEM_TEST, MEM_MANAGER_TEST) {
TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
......@@ -218,7 +217,6 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) {
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
std::map<int64_t , std::vector<float>> search_vectors;
// std::map<int64_t , std::vector<float>> vectors_ids_map;
{
engine::IDNumbers vector_ids;
int64_t nb = 1024000;
......@@ -227,24 +225,13 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) {
engine::Status status = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_TRUE(status.ok());
// std::ofstream myfile("mem_test.txt");
// for (int64_t i = 0; i < nb; ++i) {
// int64_t vector_id = vector_ids[i];
// std::vector<float> vectors;
// for (int64_t j = 0; j < TABLE_DIM; j++) {
// vectors.emplace_back(xb[i*TABLE_DIM + j]);
//// std::cout << xb[i*TABLE_DIM + j] << std::endl;
// }
// vectors_ids_map[vector_id] = vectors;
// }
std::this_thread::sleep_for(std::chrono::seconds(3));
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int64_t> dis(0, nb - 1);
int64_t numQuery = 1000;
int64_t numQuery = 20;
for (int64_t i = 0; i < numQuery; ++i) {
int64_t index = dis(gen);
std::vector<float> search;
......@@ -252,17 +239,7 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) {
search.push_back(xb[index * TABLE_DIM + j]);
}
search_vectors.insert(std::make_pair(vector_ids[index], search));
// std::cout << "index: " << index << " vector_ids[index]: " << vector_ids[index] << std::endl;
}
// for (int64_t i = 0; i < nb; i += 100000) {
// std::vector<float> search;
// for (int64_t j = 0; j < TABLE_DIM; j++) {
// search.push_back(xb[i * TABLE_DIM + j]);
// }
// search_vectors.insert(std::make_pair(vector_ids[i], search));
// }
}
int k = 10;
......@@ -270,26 +247,16 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) {
auto& search = pair.second;
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, 1, search.data(), results);
for(int t = 0; t < k; t++) {
// std::cout << "ID=" << results[0][t].first << " DISTANCE=" << results[0][t].second << std::endl;
// std::cout << vectors_ids_map[results[0][t].first].size() << std::endl;
// for (auto& data : vectors_ids_map[results[0][t].first]) {
// std::cout << data << " ";
// }
// std::cout << std::endl;
}
// std::cout << "results[0][0].first: " << results[0][0].first << " pair.first: " << pair.first << " results[0][0].second: " << results[0][0].second << std::endl;
ASSERT_EQ(results[0][0].first, pair.first);
ASSERT_LT(results[0][0].second, 0.00001);
}
stat = db_->DropAll();
ASSERT_TRUE(stat.ok());
delete db_;
boost::filesystem::remove_all(options.meta.path);
}
TEST(MEM_TEST, INSERT_TEST) {
TEST_F(NewMemManagerTest, INSERT_TEST) {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
......@@ -307,9 +274,9 @@ TEST(MEM_TEST, INSERT_TEST) {
auto start_time = METRICS_NOW_TIME;
int insert_loop = 1000;
int insert_loop = 20;
for (int i = 0; i < insert_loop; ++i) {
int64_t nb = 204800;
int64_t nb = 409600;
std::vector<float> xb;
BuildVectors(nb, xb);
engine::IDNumbers vector_ids;
......@@ -318,10 +285,91 @@ TEST(MEM_TEST, INSERT_TEST) {
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
std::cout << "total_time(ms) : " << total_time << std::endl;
LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time;
stat = db_->DropAll();
ASSERT_TRUE(stat.ok());
delete db_;
boost::filesystem::remove_all(options.meta.path);
}
TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
int64_t nb = 409600;
std::vector<float> xb;
BuildVectors(nb, xb);
int64_t qb = 5;
std::vector<float> qxb;
BuildVectors(qb, qxb);
std::thread search([&]() {
engine::QueryResults results;
int k = 10;
std::this_thread::sleep_for(std::chrono::seconds(2));
INIT_TIMER;
std::stringstream ss;
uint64_t count = 0;
uint64_t prev_count = 0;
for (auto j=0; j<10; ++j) {
ss.str("");
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results);
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
for (auto k=0; k<qb; ++k) {
ASSERT_EQ(results[k][0].first, target_ids[k]);
ss.str("");
ss << "Result [" << k << "]:";
for (auto result : results[k]) {
ss << result.first << " ";
}
/* LOG(DEBUG) << ss.str(); */
}
ASSERT_TRUE(count >= prev_count);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
int loop = 20;
for (auto i=0; i<loop; ++i) {
if (i==0) {
db_->InsertVectors(TABLE_NAME, qb, qxb.data(), target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
search.join();
delete db_;
boost::filesystem::remove_all(options.meta.path);
};
......@@ -106,6 +106,18 @@ zilliz::milvus::engine::Options MySQLDBTest::GetOptions() {
return options;
}
void NewMemManagerTest::InitLog() {
el::Configurations defaultConf;
defaultConf.setToDefault();
defaultConf.set(el::Level::Debug,
el::ConfigurationType::Format, "[%thread-%datetime-%level]: %msg (%fbase:%line)");
el::Loggers::reconfigureLogger("default", defaultConf);
}
void NewMemManagerTest::SetUp() {
InitLog();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
if (argc > 1) {
......
......@@ -87,3 +87,8 @@ class MySQLDBTest : public ::testing::Test {
protected:
zilliz::milvus::engine::Options GetOptions();
};
class NewMemManagerTest : public ::testing::Test {
void InitLog();
virtual void SetUp() override;
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册