未验证 提交 f1d271c0 编写于 作者: B BossZou 提交者: GitHub

(db/snapshot) Optimize mysql (#3095)

* Move MysqlPool to backend
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Update header
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Optimize mysqlpool
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>
Co-authored-by: NWang XiangYu <xy.wang@zilliz.com>
上级 fd2e3de6
......@@ -9,10 +9,14 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/meta/MySQLConnectionPool.h"
#include <fiu-local.h>
#include "db/meta/backend/MySQLConnectionPool.h"
#include <thread>
#include <fiu-local.h>
#include "utils/Log.h"
namespace milvus::engine::meta {
// Do a simple form of in-use connection limiting: wait to return
......@@ -22,11 +26,11 @@ namespace milvus::engine::meta {
// we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection*
MySQLConnectionPool::grab() {
while (conns_in_use_ > max_pool_size_) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
{
std::unique_lock<std::mutex> lock(mutex_);
full_.wait(lock, [this] { return conns_in_use_ < max_pool_size_; });
++conns_in_use_;
}
++conns_in_use_;
return mysqlpp::ConnectionPool::grab();
}
......@@ -34,6 +38,8 @@ MySQLConnectionPool::grab() {
void
MySQLConnectionPool::release(const mysqlpp::Connection* pc) {
mysqlpp::ConnectionPool::release(pc);
std::lock_guard<std::mutex> lock(mutex_);
if (conns_in_use_ <= 0) {
LOG_ENGINE_WARNING_ << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = "
<< conns_in_use_;
......@@ -60,6 +66,7 @@ MySQLConnectionPool::create() {
// creation.
auto conn = new mysqlpp::Connection();
conn->set_option(new mysqlpp::ReconnectOption(true));
conn->set_option(new mysqlpp::ConnectTimeoutOption(5));
conn->connect(db_name_.empty() ? 0 : db_name_.c_str(), server_.empty() ? 0 : server_.c_str(),
user_.empty() ? 0 : user_.c_str(), password_.empty() ? 0 : password_.c_str(), port_);
return conn;
......
......@@ -14,15 +14,13 @@
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <string>
#include <mysql++/mysql++.h>
#include "utils/Log.h"
namespace milvus {
namespace engine {
namespace meta {
namespace milvus::engine::meta {
class MySQLConnectionPool : public mysqlpp::ConnectionPool {
public:
......@@ -77,8 +75,9 @@ class MySQLConnectionPool : public mysqlpp::ConnectionPool {
int max_pool_size_;
unsigned int max_idle_time_ = 10; // 10 seconds
std::mutex mutex_;
std::condition_variable full_;
};
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace milvus::engine::meta
......@@ -18,11 +18,13 @@
#include <vector>
#include <fiu-local.h>
#include <mysql++/mysql++.h>
#include "db/Utils.h"
#include "db/meta/MetaNames.h"
#include "db/meta/backend/MetaHelper.h"
#include "db/meta/backend/MetaSchema.h"
#include "utils//Log.h"
#include "utils/Exception.h"
#include "utils/StringHelpFunctions.h"
......@@ -208,6 +210,9 @@ Status
MySqlEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
try {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr || !connectionPtr->connected()) {
return Status(SS_TIMEOUT, "Mysql server is not accessed");
}
std::string sql;
auto status = MetaHelper::MetaQueryContextToSql(context, sql);
......@@ -232,9 +237,13 @@ MySqlEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
}
attrs.push_back(attrs_map);
}
} catch (const mysqlpp::ConnectionFailed& er) {
return Status(SS_TIMEOUT, er.what());
} catch (const mysqlpp::BadQuery& er) {
// Handle any query errors
// cerr << "Query error: " << er.what() << endl;
LOG_ENGINE_ERROR_ << "Query error: " << er.what();
if (er.errnum() == 2006) {
return Status(SS_TIMEOUT, er.what());
}
return Status(1, er.what());
} catch (const mysqlpp::BadConversion& er) {
// Handle bad conversions
......@@ -255,6 +264,10 @@ Status
MySqlEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_contexts, std::vector<int64_t>& result_ids) {
try {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr || !connectionPtr->connected()) {
return Status(SS_TIMEOUT, "Mysql server is not accessed");
}
mysqlpp::Transaction trans(*connectionPtr, mysqlpp::Transaction::serializable, mysqlpp::Transaction::session);
std::lock_guard<std::mutex> lock(meta_mutex_);
......@@ -276,12 +289,13 @@ MySqlEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_context
}
trans.commit();
// std::cout << "[DB] Transaction commit " << std::endl;
} catch (const mysqlpp::ConnectionFailed& er) {
return Status(SS_TIMEOUT, er.what());
} catch (const mysqlpp::BadQuery& er) {
// Handle any query errors
// cerr << "Query error: " << er.what() << endl;
// return -1;
// std::cout << "[DB] Error: " << er.what() << std::endl;
LOG_ENGINE_ERROR_ << "MySql Error Code: " << er.errnum() << ": " << er.what();
if (er.errnum() == 2006) {
return Status(SS_TIMEOUT, er.what());
}
return Status(SERVER_UNSUPPORTED_ERROR, er.what());
} catch (const mysqlpp::BadConversion& er) {
// Handle bad conversions
......
......@@ -15,11 +15,9 @@
#include <mutex>
#include <vector>
#include <mysql++/mysql++.h>
#include "db/Options.h"
#include "db/meta/MySQLConnectionPool.h"
#include "db/meta/backend/MetaEngine.h"
#include "db/meta/backend/MySQLConnectionPool.h"
namespace milvus::engine::meta {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册