未验证 提交 3735e3d1 编写于 作者: B BossZou 提交者: GitHub

Improve MysqlEngine (#3227)

* Mysql
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* .
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* MysqlEngine thread aware
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Update case
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Optimize mysqlengine threads
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>
上级 3ec8c382
......@@ -73,7 +73,7 @@ class MetaAdapter {
template <typename ResourceT>
Status
Apply(snapshot::ResourceContextPtr<ResourceT> resp, int64_t& result_id) {
Execute(snapshot::ResourceContextPtr<ResourceT> resp, int64_t& result_id) {
auto session = CreateSession();
STATUS_CHECK(session->Apply<ResourceT>(resp));
......@@ -81,7 +81,7 @@ class MetaAdapter {
STATUS_CHECK(session->Commit(result_ids));
if (result_ids.size() != 1) {
return Status(DB_ERROR, "Result id is wrong");
return Status(DB_ERROR, "Result id size is wrong");
}
result_id = result_ids.at(0);
......
......@@ -16,8 +16,7 @@
#include "db/Options.h"
#include "db/meta/MetaAdapter.h"
namespace milvus {
namespace engine {
namespace milvus::engine {
class MetaFactory {
public:
......@@ -28,5 +27,4 @@ class MetaFactory {
Build(const DBMetaOptions& meta_options);
};
} // namespace engine
} // namespace milvus
} // namespace milvus::engine
......@@ -31,9 +31,21 @@ MySQLConnectionPool::grab() {
full_.wait(lock, [this] { return conns_in_use_ < max_pool_size_; });
++conns_in_use_;
}
full_.notify_one();
return mysqlpp::ConnectionPool::grab();
}
mysqlpp::Connection*
MySQLConnectionPool::safe_grab() {
{
std::unique_lock<std::mutex> lock(mutex_);
full_.wait(lock, [this] { return conns_in_use_ < max_pool_size_; });
++conns_in_use_;
}
full_.notify_one();
return mysqlpp::ConnectionPool::safe_grab();
}
// Other half of in-use conn count limit
void
MySQLConnectionPool::release(const mysqlpp::Connection* pc) {
......
......@@ -44,6 +44,9 @@ class MySQLConnectionPool : public mysqlpp::ConnectionPool {
mysqlpp::Connection*
grab() override;
mysqlpp::Connection*
safe_grab() override;
// Other half of in-use conn count limit
void
release(const mysqlpp::Connection* pc) override;
......
......@@ -12,6 +12,7 @@
#include "db/meta/backend/MySqlEngine.h"
#include <memory>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
......@@ -208,25 +209,33 @@ MySqlEngine::Initialize() {
Status
MySqlEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
auto status = Status::OK();
mysqlpp::Connection::thread_start();
try {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr || !connectionPtr->connected()) {
mysqlpp::Connection::thread_end();
return Status(SS_TIMEOUT, "Mysql server is not accessed");
}
std::string sql;
auto status = MetaHelper::MetaQueryContextToSql(context, sql);
status = MetaHelper::MetaQueryContextToSql(context, sql);
if (!status.ok()) {
mysqlpp::Connection::thread_end();
return status;
}
std::lock_guard<std::mutex> lock(meta_mutex_);
// std::lock_guard<std::mutex> lock(meta_mutex_);
mysqlpp::Query query = connectionPtr->query(sql);
auto res = query.store();
if (!res) {
// TODO: change error behavior
throw Exception(1, "Query res is false");
mysqlpp::Connection::thread_end();
std::stringstream ss;
ss << "Mysql query fail: (" << query.errnum() << ": " << query.error() << ")";
std::string err_msg = ss.str();
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
auto names = res.field_names();
......@@ -238,48 +247,61 @@ MySqlEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
attrs.push_back(attrs_map);
}
} catch (const mysqlpp::ConnectionFailed& er) {
return Status(SS_TIMEOUT, er.what());
status = Status(SS_TIMEOUT, er.what());
} catch (const mysqlpp::BadQuery& er) {
LOG_ENGINE_ERROR_ << "Query error: " << er.what();
if (er.errnum() == 2006) {
return Status(SS_TIMEOUT, er.what());
status = Status(SS_TIMEOUT, er.what());
} else {
status = Status(DB_ERROR, er.what());
}
return Status(1, er.what());
} catch (const mysqlpp::BadConversion& er) {
// Handle bad conversions
// cerr << "Conversion error: " << er.what() << endl <<
// "\tretrieved data size: " << er.retrieved <<
// ", actual size: " << er.actual_size << endl;
return Status(1, er.what());
status = Status(DB_ERROR, er.what());
} catch (const mysqlpp::Exception& er) {
// Catch-all for any other MySQL++ exceptions
// cerr << "Error: " << er.what() << endl;
return Status(1, er.what());
status = Status(DB_ERROR, er.what());
}
return Status::OK();
mysqlpp::Connection::thread_end();
return status;
}
Status
MySqlEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_contexts, std::vector<int64_t>& result_ids) {
Status status;
mysqlpp::Connection::thread_start();
try {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr || !connectionPtr->connected()) {
mysqlpp::Connection::thread_end();
return Status(SS_TIMEOUT, "Mysql server is not accessed");
}
mysqlpp::Transaction trans(*connectionPtr, mysqlpp::Transaction::serializable, mysqlpp::Transaction::session);
std::lock_guard<std::mutex> lock(meta_mutex_);
// std::lock_guard<std::mutex> lock(meta_mutex_);
for (auto& context : sql_contexts) {
std::string sql;
auto status = MetaHelper::MetaApplyContextToSql(context, sql);
status = MetaHelper::MetaApplyContextToSql(context, sql);
if (!status.ok()) {
return status;
break;
}
auto query = connectionPtr->query(sql);
auto res = query.execute();
if (!res) {
std::stringstream ss;
ss << "Mysql execute fail: (" << query.errnum() << ": " << query.error() << ")";
status = Status(DB_ERROR, ss.str());
break;
}
if (context.op_ == oAdd) {
auto id = res.insert_id();
result_ids.push_back(id);
......@@ -288,15 +310,20 @@ MySqlEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_context
}
}
trans.commit();
if (status.ok()) {
trans.commit();
} else {
trans.rollback();
}
} catch (const mysqlpp::ConnectionFailed& er) {
return Status(SS_TIMEOUT, er.what());
status = Status(SS_TIMEOUT, er.what());
} catch (const mysqlpp::BadQuery& er) {
LOG_ENGINE_ERROR_ << "MySql Error Code: " << er.errnum() << ": " << er.what();
if (er.errnum() == 2006) {
return Status(SS_TIMEOUT, er.what());
status = Status(SS_TIMEOUT, er.what());
} else {
status = Status(DB_ERROR, er.what());
}
return Status(SERVER_UNSUPPORTED_ERROR, er.what());
} catch (const mysqlpp::BadConversion& er) {
// Handle bad conversions
// cerr << "Conversion error: " << er.what() << endl <<
......@@ -304,16 +331,17 @@ MySqlEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_context
// ", actual size: " << er.actual_size << endl;
// return -1;
// std::cout << "[DB] Error: " << er.what() << std::endl;
return Status(SERVER_UNSUPPORTED_ERROR, er.what());
status = Status(DB_ERROR, er.what());
} catch (const mysqlpp::Exception& er) {
// Catch-all for any other MySQL++ exceptions
// cerr << "Error: " << er.what() << endl;
// return -1;
// std::cout << "[DB] Error: " << er.what() << std::endl;
return Status(SERVER_UNSUPPORTED_ERROR, er.what());
status = Status(DB_ERROR, er.what());
}
return Status::OK();
mysqlpp::Connection::thread_end();
return status;
}
Status
......
......@@ -102,7 +102,6 @@ class ResourceContextBuilder {
SetResource(typename T::Ptr res) {
table_ = T::Name;
id_ = res->GetID();
// resource_ = std::shared_ptr<T>(std::move(res));
resource_ = std::move(res);
return *this;
}
......
......@@ -44,9 +44,7 @@
#include <utility>
#include <vector>
namespace milvus {
namespace engine {
namespace snapshot {
namespace milvus::engine::snapshot {
class Store;
struct ApplyContext {
......@@ -186,7 +184,7 @@ class Store : public std::enable_shared_from_this<Store> {
ResourceContextBuilder<ResourceT>().SetTable(ResourceT::Name).SetOp(meta::oDelete).SetID(id).CreatePtr();
int64_t result_id;
return adapter_->Apply<ResourceT>(rc_ctx_p, result_id);
return adapter_->Execute<ResourceT>(rc_ctx_p, result_id);
}
IDS_TYPE
......@@ -230,7 +228,7 @@ class Store : public std::enable_shared_from_this<Store> {
auto res_ctx_p = ResourceContextBuilder<ResourceT>().SetOp(meta::oAdd).SetResource(res_p).CreatePtr();
int64_t result_id;
auto status = adapter_->Apply<ResourceT>(res_ctx_p, result_id);
auto status = adapter_->Execute<ResourceT>(res_ctx_p, result_id);
if (!status.ok()) {
return status;
}
......@@ -376,7 +374,7 @@ class Store : public std::enable_shared_from_this<Store> {
.AddAttr(meta::F_STATE)
.CreatePtr();
adapter_->Apply<Collection>(t_c_p, result_id);
adapter_->Execute<Collection>(t_c_p, result_id);
} else if (record.type() == typeid(std::shared_ptr<CollectionCommit>)) {
const auto& r = std::any_cast<std::shared_ptr<CollectionCommit>>(record);
r->Activate();
......@@ -385,7 +383,7 @@ class Store : public std::enable_shared_from_this<Store> {
.SetResource(r)
.AddAttr(meta::F_STATE)
.CreatePtr();
adapter_->Apply<CollectionCommit>(t_cc_p, result_id);
adapter_->Execute<CollectionCommit>(t_cc_p, result_id);
}
}
}
......@@ -396,6 +394,4 @@ class Store : public std::enable_shared_from_this<Store> {
};
using StorePtr = Store::Ptr;
} // namespace snapshot
} // namespace engine
} // namespace milvus
} // namespace milvus::engine::snapshot
......@@ -15,7 +15,7 @@ set( TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_snapshot.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_segment.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_meta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_meta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_job.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_task.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_event.cpp
......
......@@ -13,22 +13,24 @@
#include "db/meta/backend/MetaContext.h"
#include "db/snapshot/ResourceContext.h"
#include "db/utils.h"
#include "utils/Json.h"
template <typename T>
template<typename T>
using ResourceContext = milvus::engine::snapshot::ResourceContext<T>;
template <typename T>
template<typename T>
using ResourceContextBuilder = milvus::engine::snapshot::ResourceContextBuilder<T>;
using FType = milvus::engine::DataType;
using FEType = milvus::engine::FieldElementType;
using Op = milvus::engine::meta::MetaContextOp;
using State = milvus::engine::snapshot::State;
TEST_F(MetaTest, ApplyTest) {
ID_TYPE result_id;
auto collection = std::make_shared<Collection>("meta_test_c1");
auto c_ctx = ResourceContextBuilder<Collection>().SetResource(collection).CreatePtr();
auto status = meta_->Apply<Collection>(c_ctx, result_id);
auto status = meta_->Execute<Collection>(c_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection->SetID(result_id);
......@@ -36,13 +38,13 @@ TEST_F(MetaTest, ApplyTest) {
collection->Activate();
auto c2_ctx = ResourceContextBuilder<Collection>().SetResource(collection)
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
status = meta_->Apply<Collection>(c2_ctx, result_id);
status = meta_->Execute<Collection>(c2_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
ASSERT_EQ(result_id, collection->GetID());
auto c3_ctx = ResourceContextBuilder<Collection>().SetID(result_id).SetOp(Op::oDelete).CreatePtr();
status = meta_->Apply<Collection>(c3_ctx, result_id);
status = meta_->Execute<Collection>(c3_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
ASSERT_EQ(result_id, collection->GetID());
......@@ -53,29 +55,29 @@ TEST_F(MetaTest, SessionTest) {
auto collection = std::make_shared<Collection>("meta_test_c1");
auto c_ctx = ResourceContextBuilder<Collection>().SetResource(collection).CreatePtr();
auto status = meta_->Apply<Collection>(c_ctx, result_id);
auto status = meta_->Execute<Collection>(c_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection->SetID(result_id);
auto partition = std::make_shared<Partition>("meta_test_p1", result_id);
auto p_ctx = ResourceContextBuilder<Partition>().SetResource(partition).CreatePtr();
status = meta_->Apply<Partition>(p_ctx, result_id);
status = meta_->Execute<Partition>(p_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
partition->SetID(result_id);
auto field = std::make_shared<Field>("meta_test_f1", 1, FType::INT64);
auto f_ctx = ResourceContextBuilder<Field>().SetResource(field).CreatePtr();
status = meta_->Apply<Field>(f_ctx, result_id);
status = meta_->Execute<Field>(f_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
field->SetID(result_id);
auto field_element = std::make_shared<FieldElement>(collection->GetID(), field->GetID(),
"meta_test_f1_fe1", FEType::FET_RAW);
"meta_test_f1_fe1", FEType::FET_RAW);
auto fe_ctx = ResourceContextBuilder<FieldElement>().SetResource(field_element).CreatePtr();
status = meta_->Apply<FieldElement>(fe_ctx, result_id);
status = meta_->Execute<FieldElement>(fe_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
field_element->SetID(result_id);
......@@ -121,7 +123,7 @@ TEST_F(MetaTest, SelectTest) {
auto collection = std::make_shared<Collection>("meta_test_c1");
ASSERT_TRUE(collection->Activate());
auto c_ctx = ResourceContextBuilder<Collection>().SetResource(collection).CreatePtr();
auto status = meta_->Apply<Collection>(c_ctx, result_id);
auto status = meta_->Execute<Collection>(c_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection->SetID(result_id);
......@@ -135,7 +137,7 @@ TEST_F(MetaTest, SelectTest) {
auto collection2 = std::make_shared<Collection>("meta_test_c2");
ASSERT_TRUE(collection2->Activate());
auto c2_ctx = ResourceContextBuilder<Collection>().SetResource(collection2).CreatePtr();
status = meta_->Apply<Collection>(c2_ctx, result_id);
status = meta_->Execute<Collection>(c2_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection2->SetID(result_id);
......@@ -144,7 +146,7 @@ TEST_F(MetaTest, SelectTest) {
std::vector<Collection::Ptr> return_collections;
status = meta_->SelectBy<Collection, ID_TYPE>(milvus::engine::meta::F_ID,
{collection2->GetID()}, return_collections);
{collection2->GetID()}, return_collections);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(return_collections.size(), 1);
ASSERT_EQ(return_collections.at(0)->GetID(), collection2->GetID());
......@@ -174,7 +176,7 @@ TEST_F(MetaTest, TruncateTest) {
auto collection = std::make_shared<Collection>("meta_test_c1");
ASSERT_TRUE(collection->Activate());
auto c_ctx = ResourceContextBuilder<Collection>().SetResource(collection).CreatePtr();
auto status = meta_->Apply<Collection>(c_ctx, result_id);
auto status = meta_->Execute<Collection>(c_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection->SetID(result_id);
......@@ -187,3 +189,117 @@ TEST_F(MetaTest, TruncateTest) {
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(return_collection, nullptr);
}
TEST_F(MetaTest, MultiThreadRequestTest) {
auto request_worker = [&](size_t i) {
std::string collection_name_prefix = "meta_test_collection_" + std::to_string(i) + "_";
int64_t result_id;
for (size_t ii = 0; ii < 30; ii++) {
std::string collection_name = collection_name_prefix + std::to_string(ii);
auto collection = std::make_shared<Collection>(collection_name);
auto c_ctx = ResourceContextBuilder<Collection>().SetResource(collection).CreatePtr();
auto status = meta_->Execute<Collection>(c_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection->SetID(result_id);
collection->Activate();
auto c_ctx2 = ResourceContextBuilder<Collection>().SetResource(collection)
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
status = meta_->Execute<Collection>(c_ctx2, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
CollectionPtr collection2;
status = meta_->Select<Collection>(result_id, collection2);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(collection2->GetID(), result_id);
ASSERT_EQ(collection2->GetState(), State::ACTIVE);
ASSERT_EQ(collection2->GetName(), collection_name);
collection->Deactivate();
auto c_ctx3 = ResourceContextBuilder<Collection>().SetResource(collection)
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
status = meta_->Execute<Collection>(c_ctx3, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(result_id, collection->GetID());
auto c_ctx4 = ResourceContextBuilder<Collection>().SetID(result_id)
.SetOp(Op::oDelete).SetTable(Collection::Name).CreatePtr();
status = meta_->Execute<Collection>(c_ctx4, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
CollectionPtr collection3;
status = meta_->Select<Collection>(result_id, collection3);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(collection3, nullptr);
}
};
auto cc_task = [&](size_t j) {
std::string collection_name_prefix = "meta_test_collection_cc_" + std::to_string(j) + "_";
int64_t result_id;
Status status;
for (size_t jj = 0; jj < 20; jj ++) {
std::string collection_name = collection_name_prefix + std::to_string(jj);
milvus::json cj{{"segment_row_count", 1024}};
auto collection = std::make_shared<Collection>(collection_name, cj);
auto c_ctx = ResourceContextBuilder<Collection>().SetResource(collection).SetOp(Op::oAdd).CreatePtr();
status = meta_->Execute<Collection>(c_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
collection->SetID(result_id);
std::string partition_name = collection_name + "_p_" + std::to_string(jj);
auto partition = std::make_shared<Partition>(partition_name, collection->GetID());
auto p_ctx = ResourceContextBuilder<Partition>().SetResource(partition).SetOp(Op::oAdd).CreatePtr();
status = meta_->Execute<Partition>(p_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
partition->SetID(result_id);
std::string segment_name = partition_name + "_s_" + std::to_string(jj);
auto segment = std::make_shared<Segment>(collection->GetID(), partition->GetID());
auto s_ctx = ResourceContextBuilder<Segment>().SetResource(segment).SetOp(Op::oAdd).CreatePtr();
status = meta_->Execute<Segment>(s_ctx, result_id);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_GT(result_id, 0);
segment->SetID(result_id);
auto session = meta_->CreateSession();
collection->Activate();
auto c_ctx2 = ResourceContextBuilder<Collection>().SetResource(collection)
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
ASSERT_TRUE(session->Apply<Collection>(c_ctx2).ok());
partition->Activate();
auto p_ctx2 = ResourceContextBuilder<Partition>().SetResource(partition)
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
ASSERT_TRUE(session->Apply<Partition>(p_ctx2).ok());
segment->Activate();
auto s_ctx2 = ResourceContextBuilder<Segment>().SetResource(segment)
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
ASSERT_TRUE(session->Apply<Segment>(s_ctx2).ok());
std::vector<int64_t> ids;
status = session->Commit(ids);
ASSERT_TRUE(status.ok()) << status.ToString();
}
};
unsigned int thread_hint = std::thread::hardware_concurrency();
std::vector<std::thread> request_threads;
for (size_t i = 0; i < 3 * thread_hint; i++) {
request_threads.emplace_back(request_worker, i);
}
std::vector<std::thread> cc_threads;
for (size_t j = 0; j < 3 * thread_hint; j++) {
cc_threads.emplace_back(cc_task, j);
}
for (auto& t : request_threads) {
t.join();
}
for (auto& t : cc_threads) {
t.join();
}
}
......@@ -300,7 +300,7 @@ void
MetaTest::SetUp() {
auto engine = std::make_shared<milvus::engine::meta::MockEngine>();
// milvus::engine::DBMetaOptions options;
// options.backend_uri_ = "mysql://root:12345678@127.0.0.1:3307/milvus";
// options.backend_uri_ = "mysql://root:12345678@127.0.0.1:3309/milvus";
// auto engine = std::make_shared<milvus::engine::meta::MySqlEngine>(options);
meta_ = std::make_shared<milvus::engine::meta::MetaAdapter>(engine);
meta_->TruncateAll();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册