提交 cf8ee20f 编写于 作者: Z zhiru

update


Former-commit-id: 31aeb1f531676aeb9e9655a169a80ff508f6ae00
上级 90692fe3
......@@ -33,6 +33,7 @@ function mysql_exc()
mysql_exc "CREATE DATABASE IF NOT EXISTS ${MYSQL_DB_NAME};"
mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%';"
mysql_exc "FLUSH PRIVILEGES;"
mysql_exc "USE ${MYSQL_DB_NAME};"
# get baseline
${LCOV_CMD} -c -i -d ${DIR_GCNO} -o "${FILE_INFO_BASE}"
......
#include "MySQLConnectionPool.h"
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
// Do a simple form of in-use connection limiting: wait to return
// a connection until there are a reasonably low number in use
// already. Can't do this in create() because we're interested in
// connections actually in use, not those created. Also note that
// we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection *MySQLConnectionPool::grab() {
while (conns_in_use_ > max_pool_size_) {
sleep(1);
}
++conns_in_use_;
return mysqlpp::ConnectionPool::grab();
}
// Other half of in-use conn count limit
void MySQLConnectionPool::release(const mysqlpp::Connection *pc) {
mysqlpp::ConnectionPool::release(pc);
if (conns_in_use_ <= 0) {
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_;
} else {
--conns_in_use_;
}
}
int MySQLConnectionPool::getConnectionsInUse() {
return conns_in_use_;
}
void MySQLConnectionPool::set_max_idle_time(int max_idle) {
max_idle_time_ = max_idle;
}
std::string MySQLConnectionPool::getDB() {
return db_;
}
// Superclass overrides
mysqlpp::Connection *MySQLConnectionPool::create() {
try {
// Create connection using the parameters we were passed upon
// creation.
mysqlpp::Connection *conn = new mysqlpp::Connection();
conn->set_option(new mysqlpp::ReconnectOption(true));
conn->connect(db_.empty() ? 0 : db_.c_str(),
server_.empty() ? 0 : server_.c_str(),
user_.empty() ? 0 : user_.c_str(),
password_.empty() ? 0 : password_.c_str(),
port_);
return conn;
} catch (const mysqlpp::ConnectionFailed& er) {
ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what();
return nullptr;
}
}
void MySQLConnectionPool::destroy(mysqlpp::Connection *cp) {
// Our superclass can't know how we created the Connection, so
// it delegates destruction to us, to be safe.
delete cp;
}
unsigned int MySQLConnectionPool::max_idle_time() {
return max_idle_time_;
}
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -6,6 +6,11 @@
#include "Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
class MySQLConnectionPool : public mysqlpp::ConnectionPool {
public:
......@@ -21,8 +26,7 @@ public:
password_(passWord),
server_(serverIp),
port_(port),
max_pool_size_(maxPoolSize)
{
max_pool_size_(maxPoolSize) {
conns_in_use_ = 0;
......@@ -35,69 +39,25 @@ public:
clear();
}
// Do a simple form of in-use connection limiting: wait to return
// a connection until there are a reasonably low number in use
// already. Can't do this in create() because we're interested in
// connections actually in use, not those created. Also note that
// we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection* grab() override {
while (conns_in_use_ > max_pool_size_) {
sleep(1);
}
++conns_in_use_;
return mysqlpp::ConnectionPool::grab();
}
mysqlpp::Connection *grab() override;
// Other half of in-use conn count limit
void release(const mysqlpp::Connection* pc) override {
mysqlpp::ConnectionPool::release(pc);
if (conns_in_use_ <= 0) {
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl;
}
else {
--conns_in_use_;
}
}
void release(const mysqlpp::Connection *pc) override;
int getConnectionsInUse() {
return conns_in_use_;
}
int getConnectionsInUse();
void set_max_idle_time(int max_idle) {
max_idle_time_ = max_idle;
}
void set_max_idle_time(int max_idle);
std::string getDB() {
return db_;
}
std::string getDB();
protected:
// Superclass overrides
mysqlpp::Connection* create() override {
// Create connection using the parameters we were passed upon
// creation.
mysqlpp::Connection* conn = new mysqlpp::Connection();
conn->set_option(new mysqlpp::ReconnectOption(true));
conn->connect(db_.empty() ? 0 : db_.c_str(),
server_.empty() ? 0 : server_.c_str(),
user_.empty() ? 0 : user_.c_str(),
password_.empty() ? 0 : password_.c_str(),
port_);
return conn;
}
mysqlpp::Connection *create() override;
void destroy(mysqlpp::Connection* cp) override {
// Our superclass can't know how we created the Connection, so
// it delegates destruction to us, to be safe.
delete cp;
}
void destroy(mysqlpp::Connection *cp) override;
unsigned int max_idle_time() override {
return max_idle_time_;
}
unsigned int max_idle_time() override;
private:
// Number of connections currently in use
......@@ -110,4 +70,9 @@ private:
int max_pool_size_;
unsigned int max_idle_time_;
};
\ No newline at end of file
};
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
......@@ -169,6 +169,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mysql_connection_pool_->getConnectionsInUse();
// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) {
// return Status::Error("DB connection failed: ", connectionPtr->error());
......@@ -234,9 +238,6 @@ namespace meta {
// } else {
// return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
// }
} catch (const ConnectionFailed& er) {
ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what();
return Status::DBTransactionError("Failed to connect to database server", er.what());
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR DURING INITIALIZATION" << ": " << er.what();
......@@ -292,6 +293,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -335,6 +340,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -351,7 +360,7 @@ namespace meta {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
StoreQueryResult res = createTableQuery.store();
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
int state = res[0]["state"];
if (TableSchema::TO_DELETE == state) {
......@@ -438,6 +447,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -483,6 +496,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -529,6 +546,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -544,7 +565,6 @@ namespace meta {
res = describeTableQuery.store();
} //Scoped Connection
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
const Row& resRow = res[0];
......@@ -592,6 +612,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -609,7 +633,6 @@ namespace meta {
res = hasTableQuery.store();
} //Scoped Connection
assert(res && res.num_rows() == 1);
int check = res[0]["check"];
has_or_not = (check == 1);
......@@ -639,6 +662,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -726,6 +753,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -792,6 +823,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -875,6 +910,9 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -986,6 +1024,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1078,6 +1120,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1093,8 +1139,6 @@ namespace meta {
res = getTableFileQuery.store();
} //Scoped Connection
assert(res);
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
......@@ -1162,6 +1206,10 @@ namespace meta {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1212,6 +1260,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1226,7 +1278,6 @@ namespace meta {
res = getSizeQuery.store();
} //Scoped Connection
assert(res && res.num_rows() == 1);
// if (!res) {
//// std::cout << "result is NULL" << std::endl;
// return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", getSizeQuery.error());
......@@ -1272,6 +1323,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1288,7 +1343,6 @@ namespace meta {
// std::cout << discardFilesQuery.str() << std::endl;
StoreQueryResult res = discardFilesQuery.store();
assert(res);
if (res.num_rows() == 0) {
return Status::OK();
}
......@@ -1350,6 +1404,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1365,7 +1423,6 @@ namespace meta {
StoreQueryResult res = updateTableFileQuery.store();
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
int state = res[0]["state"];
if (state == TableSchema::TO_DELETE) {
......@@ -1432,6 +1489,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1455,7 +1516,6 @@ namespace meta {
StoreQueryResult res = updateTableFilesQuery.store();
assert(res && res.num_rows() == 1);
int check = res[0]["check"];
has_tables[file_schema.table_id_] = (check == 1);
}
......@@ -1527,6 +1587,10 @@ namespace meta {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = "
// << mysql_connection_pool_->getConnectionsInUse();
......@@ -1542,8 +1606,6 @@ namespace meta {
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
assert(res);
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
......@@ -1611,6 +1673,10 @@ namespace meta {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = "
// << mysql_connection_pool_->getConnectionsInUse();
......@@ -1624,7 +1690,6 @@ namespace meta {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
assert(res);
// std::cout << res.num_rows() << std::endl;
if (!res.empty()) {
......@@ -1677,6 +1742,10 @@ namespace meta {
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1690,7 +1759,7 @@ namespace meta {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
StoreQueryResult res = cleanUpQuery.store();
assert(res);
if (!res.empty()) {
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
......@@ -1736,6 +1805,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......@@ -1759,7 +1832,12 @@ namespace meta {
result += size;
}
assert(table_schema.dimension_ != 0);
if (table_schema.dimension_ <= 0) {
std::stringstream errorMsg;
errorMsg << "MySQLMetaImpl::Count: " << "table dimension = " << std::to_string(table_schema.dimension_) << ", table_id = " << table_id;
ENGINE_LOG_ERROR << errorMsg.str();
return Status::Error(errorMsg.str());
}
result /= table_schema.dimension_;
result /= sizeof(float);
......@@ -1786,6 +1864,10 @@ namespace meta {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册