提交 b25873ba 编写于 作者: J jinhai

Merge branch 'mysql-0.3.0' into 'branch-0.3.0'

MS-105 - Add MySQL

See merge request megasearch/vecwise_engine!119

Former-commit-id: 2693c3a3163ec5a6bb6801a40fe7983d99862d91
......@@ -46,7 +46,9 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-85 - add NetIO metric
- MS-96 - add new query interface for specified files
- MS-97 - Add S3 SDK for MinIO Storage
- MS-105 - Add MySQL
- MS-130 - Add prometheus_test
## Task
- MS-74 - Change README.md in cpp
- MS-88 - Add support for arm architecture
......
......@@ -113,20 +113,13 @@ link_directories(${MILVUS_BINARY_DIR})
set(MILVUS_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include)
set(MILVUS_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src)
#set(MILVUS_THIRD_PARTY ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
#set(MILVUS_THIRD_PARTY_BUILD ${CMAKE_CURRENT_SOURCE_DIR}/third_party/build)
add_compile_definitions(PROFILER=${PROFILER})
include_directories(${MILVUS_ENGINE_INCLUDE})
include_directories(${MILVUS_ENGINE_SRC})
#include_directories(${MILVUS_THIRD_PARTY_BUILD}/include)
link_directories(${CMAKE_CURRRENT_BINARY_DIR})
#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib)
#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib64)
#execute_process(COMMAND bash build.sh
# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
add_subdirectory(src)
......
### Compilation
#### Step 1: install necessery tools
Install MySQL
centos7 :
yum install gfortran flex bison
yum install gfortran qt4 flex bison mysql-devel
ubuntu16.04 :
sudo apt-get install gfortran flex bison
sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev
If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link:
```
sudo ln -s /path/to/libmysqlclient.so /path/to/libmysqlclient_r.so
```
#### Step 2: build(output to cmake_build folder)
cmake_build/src/milvus_server is the server
cmake_build/src/libmilvus_engine.a is the static library
......
......@@ -93,6 +93,8 @@ define_option(MILVUS_WITH_SQLITE "Build with SQLite library" ON)
define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON)
define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON)
define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON)
define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON)
......
......@@ -26,6 +26,7 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES
JSONCONS
LAPACK
Lz4
MySQLPP
OpenBLAS
Prometheus
RocksDB
......@@ -56,12 +57,14 @@ macro(build_dependency DEPENDENCY_NAME)
build_easyloggingpp()
elseif("${DEPENDENCY_NAME}" STREQUAL "FAISS")
build_faiss()
elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest")
build_gtest()
elseif("${DEPENDENCY_NAME}" STREQUAL "LAPACK")
build_lapack()
elseif("${DEPENDENCY_NAME}" STREQUAL "Lz4")
build_lz4()
elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest")
build_gtest()
elseif ("${DEPENDENCY_NAME}" STREQUAL "MySQLPP")
build_mysqlpp()
elseif ("${DEPENDENCY_NAME}" STREQUAL "JSONCONS")
build_jsoncons()
elseif ("${DEPENDENCY_NAME}" STREQUAL "OpenBLAS")
......@@ -265,6 +268,12 @@ else()
set(LZ4_SOURCE_URL "https://github.com/lz4/lz4/archive/${LZ4_VERSION}.tar.gz")
endif()
if(DEFINED ENV{MILVUS_MYSQLPP_URL})
set(MYSQLPP_SOURCE_URL "$ENV{MILVUS_MYSQLPP_URL}")
else()
set(MYSQLPP_SOURCE_URL "https://tangentsoft.com/mysqlpp/releases/mysql++-${MYSQLPP_VERSION}.tar.gz")
endif()
if (DEFINED ENV{MILVUS_OPENBLAS_URL})
set(OPENBLAS_SOURCE_URL "$ENV{MILVUS_OPENBLAS_URL}")
else ()
......@@ -830,7 +839,7 @@ macro(build_faiss)
BUILD_COMMAND
${MAKE} ${MAKE_BUILD_ARGS} all
COMMAND
cd gpu && make ${MAKE_BUILD_ARGS}
cd gpu && ${MAKE} ${MAKE_BUILD_ARGS}
BUILD_IN_SOURCE
1
# INSTALL_DIR
......@@ -1068,6 +1077,65 @@ if(MILVUS_WITH_LZ4)
include_directories(SYSTEM ${LZ4_INCLUDE_DIR})
endif()
# ----------------------------------------------------------------------
# MySQL++
macro(build_mysqlpp)
message(STATUS "Building MySQL++-${MYSQLPP_VERSION} from source")
set(MYSQLPP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep")
set(MYSQLPP_INCLUDE_DIR "${MYSQLPP_PREFIX}/include")
set(MYSQLPP_SHARED_LIB
"${MYSQLPP_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}")
set(MYSQLPP_CONFIGURE_ARGS
"--prefix=${MYSQLPP_PREFIX}"
"--enable-thread-check"
"CFLAGS=${EP_C_FLAGS}"
"CXXFLAGS=${EP_CXX_FLAGS}"
"LDFLAGS=-pthread")
externalproject_add(mysqlpp_ep
URL
${MYSQLPP_SOURCE_URL}
# GIT_REPOSITORY
# ${MYSQLPP_SOURCE_URL}
# GIT_TAG
# ${MYSQLPP_VERSION}
# GIT_SHALLOW
# TRUE
${EP_LOG_OPTIONS}
CONFIGURE_COMMAND
# "./bootstrap"
# COMMAND
"./configure"
${MYSQLPP_CONFIGURE_ARGS}
BUILD_COMMAND
${MAKE} ${MAKE_BUILD_ARGS}
BUILD_IN_SOURCE
1
BUILD_BYPRODUCTS
${MYSQLPP_SHARED_LIB})
file(MAKE_DIRECTORY "${MYSQLPP_INCLUDE_DIR}")
add_library(mysqlpp SHARED IMPORTED)
set_target_properties(
mysqlpp
PROPERTIES
IMPORTED_LOCATION "${MYSQLPP_SHARED_LIB}"
INTERFACE_INCLUDE_DIRECTORIES "${MYSQLPP_INCLUDE_DIR}")
add_dependencies(mysqlpp mysqlpp_ep)
endmacro()
if(MILVUS_WITH_MYSQLPP)
resolve_dependency(MySQLPP)
get_target_property(MYSQLPP_INCLUDE_DIR mysqlpp INTERFACE_INCLUDE_DIRECTORIES)
include_directories(SYSTEM "${MYSQLPP_INCLUDE_DIR}")
link_directories(SYSTEM ${MYSQLPP_PREFIX}/lib)
endif()
# ----------------------------------------------------------------------
# Prometheus
......
......@@ -20,15 +20,20 @@ if [ $? -ne 0 ]; then
fi
for test in `ls ${DIR_UNITTEST}`; do
echo $test
case ${test} in
db_test)
# set run args for db_test
args="mysql://root:Fantast1c@192.168.1.194:3306/test"
;;
*_test)
args=""
;;
esac
# run unittest
./${DIR_UNITTEST}/${test}
./${DIR_UNITTEST}/${test} "${args}"
if [ $? -ne 0 ]; then
echo ${DIR_UNITTEST}/${test} "run failed"
fi
esac
done
# gen test converage
......
......@@ -62,6 +62,7 @@ set(s3_client_files
include_directories(/usr/include)
include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include")
include_directories(thrift/gen-cpp)
include_directories(/usr/include/mysql)
set(third_party_libs
easyloggingpp
......@@ -83,6 +84,7 @@ set(third_party_libs
snappy
zlib
zstd
mysqlpp
${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so
)
if (MEGASEARCH_WITH_ARROW STREQUAL "ON")
......@@ -181,4 +183,10 @@ endif ()
install(TARGETS milvus_server DESTINATION bin)
install(FILES
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4
DESTINATION bin/lib) #need to copy libmysqlpp.so
add_subdirectory(sdk)
......@@ -7,11 +7,13 @@
#include "DBMetaImpl.h"
#include "Log.h"
#include "EngineFactory.h"
#include "Factories.h"
#include "metrics/Metrics.h"
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
#include "utils/TimeRecorder.h"
#include "MetaConsts.h"
#include <assert.h>
#include <chrono>
......@@ -132,11 +134,14 @@ void CalcScore(uint64_t vector_count,
DBImpl::DBImpl(const Options& options)
: options_(options),
shutting_down_(false),
meta_ptr_(new meta::DBMetaImpl(options_.meta)),
mem_mgr_(new MemManager(meta_ptr_, options_)),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
// mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
if (options.mode != Options::MODE::READ_ONLY) {
StartTimerTasks();
}
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
......@@ -465,9 +470,14 @@ void DBImpl::StartMetricTask() {
}
void DBImpl::StartCompactionTask() {
// static int count = 0;
// count++;
// std::cout << "StartCompactionTask: " << count << std::endl;
// std::cout << "c: " << count++ << std::endl;
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
// std::cout << "c r: " << count++ << std::endl;
return;
}
......@@ -574,6 +584,10 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
// static int b_count = 0;
// b_count++;
// std::cout << "BackgroundCompaction: " << b_count << std::endl;
Status status;
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
......@@ -584,7 +598,13 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
}
meta_ptr_->Archive();
meta_ptr_->CleanUpFilesWithTTL(1);
int ttl = 1;
if (options_.mode == Options::MODE::CLUSTER) {
ttl = meta::D_SEC;
// ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
}
void DBImpl::StartBuildIndexTask() {
......
......@@ -183,6 +183,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
......
......@@ -3,16 +3,18 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <stdlib.h>
#include "Factories.h"
#include "DBImpl.h"
#include <stdlib.h>
#include <time.h>
#include <sstream>
#include <iostream>
#include <vector>
#include <assert.h>
#include <easylogging++.h>
#include <regex>
#include "Exception.h"
namespace zilliz {
namespace milvus {
......@@ -26,6 +28,7 @@ DBMetaOptions DBMetaOptionsFactory::Build(const std::string& path) {
ss << "/tmp/" << rand();
p = ss.str();
}
DBMetaOptions meta;
meta.path = p;
return meta;
......@@ -43,6 +46,48 @@ std::shared_ptr<meta::DBMetaImpl> DBMetaImplFactory::Build() {
return std::shared_ptr<meta::DBMetaImpl>(new meta::DBMetaImpl(options));
}
std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOptions,
const int& mode) {
std::string uri = metaOptions.backend_uri;
std::string dialectRegex = "(.*)";
std::string usernameRegex = "(.*)";
std::string passwordRegex = "(.*)";
std::string hostRegex = "(.*)";
std::string portRegex = "(.*)";
std::string dbNameRegex = "(.*)";
std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
usernameRegex + "\\:" +
passwordRegex + "\\@" +
hostRegex + "\\:" +
portRegex + "\\/" +
dbNameRegex;
std::regex uriRegex(uriRegexStr);
std::smatch pieces_match;
if (std::regex_match(uri, pieces_match, uriRegex)) {
std::string dialect = pieces_match[1].str();
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions, mode));
}
else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions));
}
else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
}
}
else {
ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri;
throw InvalidArgumentException("Wrong URI format ");
}
}
std::shared_ptr<DB> DBFactory::Build() {
auto options = OptionsFactory::Build();
auto db = DBFactory::Build(options);
......
......@@ -7,6 +7,7 @@
#include "DB.h"
#include "DBMetaImpl.h"
#include "MySQLMetaImpl.h"
#include "Options.h"
#include "ExecutionEngine.h"
......@@ -27,6 +28,7 @@ struct OptionsFactory {
struct DBMetaImplFactory {
static std::shared_ptr<meta::DBMetaImpl> Build();
static std::shared_ptr<meta::Meta> Build(const DBMetaOptions& metaOptions, const int& mode);
};
struct DBFactory {
......
#include "mysql++/mysql++.h"
#include <string>
#include <unistd.h>
#include <atomic>
#include "Log.h"
class MySQLConnectionPool : public mysqlpp::ConnectionPool {
public:
// The object's only constructor
MySQLConnectionPool(std::string dbName,
std::string userName,
std::string passWord,
std::string serverIp,
int port = 0,
int maxPoolSize = 8) :
db_(dbName),
user_(userName),
password_(passWord),
server_(serverIp),
port_(port),
max_pool_size_(maxPoolSize)
{
conns_in_use_ = 0;
max_idle_time_ = 10; //10 seconds
}
// The destructor. We _must_ call ConnectionPool::clear() here,
// because our superclass can't do it for us.
~MySQLConnectionPool() override {
clear();
}
// Do a simple form of in-use connection limiting: wait to return
// a connection until there are a reasonably low number in use
// already. Can't do this in create() because we're interested in
// connections actually in use, not those created. Also note that
// we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection* grab() override {
while (conns_in_use_ > max_pool_size_) {
sleep(1);
}
++conns_in_use_;
return mysqlpp::ConnectionPool::grab();
}
// Other half of in-use conn count limit
void release(const mysqlpp::Connection* pc) override {
mysqlpp::ConnectionPool::release(pc);
if (conns_in_use_ <= 0) {
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl;
}
else {
--conns_in_use_;
}
}
int getConnectionsInUse() {
return conns_in_use_;
}
void set_max_idle_time(int max_idle) {
max_idle_time_ = max_idle;
}
protected:
// Superclass overrides
mysqlpp::Connection* create() override {
// Create connection using the parameters we were passed upon
// creation.
mysqlpp::Connection* conn = new mysqlpp::Connection();
conn->set_option(new mysqlpp::ReconnectOption(true));
conn->connect(db_.empty() ? 0 : db_.c_str(),
server_.empty() ? 0 : server_.c_str(),
user_.empty() ? 0 : user_.c_str(),
password_.empty() ? 0 : password_.c_str(),
port_);
return conn;
}
void destroy(mysqlpp::Connection* cp) override {
// Our superclass can't know how we created the Connection, so
// it delegates destruction to us, to be safe.
delete cp;
}
unsigned int max_idle_time() override {
return max_idle_time_;
}
private:
// Number of connections currently in use
std::atomic<int> conns_in_use_;
// Our connection parameters
std::string db_, user_, password_, server_;
int port_;
int max_pool_size_;
unsigned int max_idle_time_;
};
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "MySQLMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
#include "Log.h"
#include "MetaConsts.h"
#include "Factories.h"
#include "metrics/Metrics.h"
#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
#include <mutex>
#include <thread>
#include "mysql++/mysql++.h"
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
using namespace mysqlpp;
// static std::unique_ptr<Connection> connectionPtr(new Connection());
// std::recursive_mutex mysql_mutex;
//
// std::unique_ptr<Connection>& MySQLMetaImpl::getConnectionPtr() {
//// static std::recursive_mutex connectionMutex_;
// std::lock_guard<std::recursive_mutex> lock(connectionMutex_);
// return connectionPtr;
// }
namespace {
Status HandleException(const std::string& desc, std::exception &e) {
ENGINE_LOG_ERROR << desc << ": " << e.what();
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
std::string MySQLMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string MySQLMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void MySQLMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date_ == EmptyDate) {
group_file.date_ = Meta::GetDate();
}
std::stringstream ss;
ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
<< "/" << group_file.file_id_;
group_file.location_ = ss.str();
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber();
table_id = ss.str();
return Status::OK();
}
Status MySQLMetaImpl::NextFileId(std::string &file_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber();
file_id = ss.str();
return Status::OK();
}
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int& mode)
: options_(options_),
mode_(mode) {
Initialize();
}
Status MySQLMetaImpl::Initialize() {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) {
ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
return Status::DBTransactionError("Failed to create db directory", options_.path);
}
}
std::string uri = options_.backend_uri;
std::string dialectRegex = "(.*)";
std::string usernameRegex = "(.*)";
std::string passwordRegex = "(.*)";
std::string hostRegex = "(.*)";
std::string portRegex = "(.*)";
std::string dbNameRegex = "(.*)";
std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
usernameRegex + "\\:" +
passwordRegex + "\\@" +
hostRegex + "\\:" +
portRegex + "\\/" +
dbNameRegex;
std::regex uriRegex(uriRegexStr);
std::smatch pieces_match;
if (std::regex_match(uri, pieces_match, uriRegex)) {
std::string dialect = pieces_match[1].str();
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") == std::string::npos) {
return Status::Error("URI's dialect is not MySQL");
}
const char* username = pieces_match[2].str().c_str();
const char* password = pieces_match[3].str().c_str();
const char* serverAddress = pieces_match[4].str().c_str();
unsigned int port = 0;
if (!pieces_match[5].str().empty()) {
port = std::stoi(pieces_match[5].str());
}
const char* dbName = pieces_match[6].str().c_str();
//std::cout << dbName << " " << serverAddress << " " << username << " " << password << " " << port << std::endl;
// connectionPtr->set_option(new MultiStatementsOption(true));
// connectionPtr->set_option(new mysqlpp::ReconnectOption(true));
int threadHint = std::thread::hardware_concurrency();
int maxPoolSize = threadHint == 0 ? 8 : threadHint;
mysql_connection_pool_ = std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl;
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
try {
CleanUp();
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mysql_connection_pool_->getConnectionsInUse();
// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) {
// return Status::Error("DB connection failed: ", connectionPtr->error());
// }
if (!connectionPtr->thread_aware()) {
ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it.");
}
Query InitializeQuery = connectionPtr->query();
// InitializeQuery << "SET max_allowed_packet=67108864;";
// if (!InitializeQuery.exec()) {
// return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
// }
// InitializeQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";
InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" <<
"id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
"table_id VARCHAR(255) UNIQUE NOT NULL, " <<
"state INT NOT NULL, " <<
"dimension SMALLINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " <<
"files_cnt BIGINT DEFAULT 0 NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " <<
"store_raw_data BOOL DEFAULT false NOT NULL);";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
}
if (!InitializeQuery.exec()) {
return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
}
InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" <<
"id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
"table_id VARCHAR(255) NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " <<
"file_id VARCHAR(255) NOT NULL, " <<
"file_type INT DEFAULT 0 NOT NULL, " <<
"size BIGINT DEFAULT 0 NOT NULL, " <<
"updated_time BIGINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " <<
"date INT DEFAULT -1 NOT NULL);";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
}
if (!InitializeQuery.exec()) {
return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
}
} //Scoped Connection
// //Consume all results to avoid "Commands out of sync" error
// while (InitializeQuery.more_results()) {
// InitializeQuery.store_next();
// }
return Status::OK();
// if (InitializeQuery.exec()) {
// std::cout << "XXXXXXXXXXXXXXXXXXXXXXXXX" << std::endl;
// while (InitializeQuery.more_results()) {
// InitializeQuery.store_next();
// }
// return Status::OK();
// } else {
// return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
// }
} catch (const ConnectionFailed& er) {
ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what();
return Status::DBTransactionError("Failed to connect to database server", er.what());
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR DURING INITIALIZATION" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR DURING INITIALIZATION", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR DURING INITIALIZATION" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what());
} catch (std::exception &e) {
return HandleException("Encounter exception during initialization", e);
}
}
else {
ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
return Status::Error("Wrong URI format");
}
}
// PXU TODO: Temp solution. Will fix later
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (dates.empty()) {
return Status::OK();
}
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
try {
auto yesterday = GetDateWithDelta(-1);
for (auto &date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions within 2 days");
}
}
std::stringstream dateListSS;
for (auto &date : dates) {
dateListSS << std::to_string(date) << ", ";
}
std::string dateListStr = dateListSS.str();
dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query dropPartitionsByDatesQuery = connectionPtr->query();
dropPartitionsByDatesQuery << "UPDATE TableFiles " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"date in (" << dateListStr << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str();
}
if (!dropPartitionsByDatesQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES";
return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES",
dropPartitionsByDatesQuery.error());
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
// server::Metrics::GetInstance().MetaAccessTotalIncrement();
try {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query createTableQuery = connectionPtr->query();
// ENGINE_LOG_DEBUG << "Create Table in";
if (table_schema.table_id_.empty()) {
NextTableId(table_schema.table_id_);
} else {
createTableQuery << "SELECT state FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << ";";
// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str();
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
}
StoreQueryResult res = createTableQuery.store();
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
int state = res[0]["state"];
if (TableSchema::TO_DELETE == state) {
return Status::Error("Table already exists and it is in delete state, please wait a second");
}
else {
return Status::OK();//table already exists, no error
}
}
}
// ENGINE_LOG_DEBUG << "Create Table start";
table_schema.files_cnt_ = 0;
table_schema.id_ = -1;
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
// auto start_time = METRICS_NOW_TIME;
std::string id = "NULL"; //auto-increment
std::string table_id = table_schema.table_id_;
std::string state = std::to_string(table_schema.state_);
std::string dimension = std::to_string(table_schema.dimension_);
std::string created_on = std::to_string(table_schema.created_on_);
std::string files_cnt = "0";
std::string engine_type = std::to_string(table_schema.engine_type_);
std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false";
createTableQuery << "INSERT INTO Tables VALUES" <<
"(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data << ");";
// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str();
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
}
if (SimpleResult res = createTableQuery.execute()) {
table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
// std::cout << table_schema.id_ << std::endl;
//Consume all results to avoid "Commands out of sync" error
// while (createTableQuery.more_results()) {
// createTableQuery.store_next();
// }
} else {
ENGINE_LOG_ERROR << "Add Table Error";
return Status::DBTransactionError("Add Table Error", createTableQuery.error());
}
} //Scoped Connection
// auto end_time = METRICS_NOW_TIME;
// auto total_time = METRICS_MICROSECONDS(start_time, end_time);
// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what());
} catch (std::exception &e) {
return HandleException("Encounter exception when create table", e);
}
return Status::OK();
}
Status MySQLMetaImpl::DeleteTable(const std::string& table_id) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
//soft delete table
Query deleteTableQuery = connectionPtr->query();
//
deleteTableQuery << "UPDATE Tables " <<
"SET state = " << std::to_string(TableSchema::TO_DELETE) << " " <<
"WHERE table_id = " << quote << table_id << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
}
if (!deleteTableQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE";
return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
}
} //Scoped Connection
if (mode_ != Options::MODE::SINGLE) {
DeleteTableFiles(table_id);
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
//soft delete table files
Query deleteTableFilesQuery = connectionPtr->query();
//
deleteTableFilesQuery << "UPDATE TableFiles " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
"updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
}
if (!deleteTableFilesQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES";
return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error());
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE FILES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE FILES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE FILES", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query describeTableQuery = connectionPtr->query();
describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " <<
"FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
}
res = describeTableQuery.store();
} //Scoped Connection
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
const Row& resRow = res[0];
table_schema.id_ = resRow["id"]; //implicit conversion
table_schema.dimension_ = resRow["dimension"];
table_schema.files_cnt_ = resRow["files_cnt"];
table_schema.engine_type_ = resRow["engine_type"];
int store_raw_data = resRow["store_raw_data"];
table_schema.store_raw_data_ = (store_raw_data == 1);
}
else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING TABLE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING TABLE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING TABLE", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query hasTableQuery = connectionPtr->query();
//since table_id is a unique column we just need to check whether it exists or not
hasTableQuery << "SELECT EXISTS " <<
"(SELECT 1 FROM Tables " <<
"WHERE table_id = " << quote << table_id << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
"AS " << quote << "check" << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
}
res = hasTableQuery.store();
} //Scoped Connection
assert(res && res.num_rows() == 1);
int check = res[0]["check"];
has_or_not = (check == 1);
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF TABLE EXISTS", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query allTablesQuery = connectionPtr->query();
allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " <<
"FROM Tables " <<
"WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
}
res = allTablesQuery.store();
} //Scoped Connection
for (auto& resRow : res) {
TableSchema table_schema;
table_schema.id_ = resRow["id"]; //implicit conversion
std::string table_id;
resRow["table_id"].to_string(table_id);
table_schema.table_id_ = table_id;
table_schema.dimension_ = resRow["dimension"];
table_schema.files_cnt_ = resRow["files_cnt"];
table_schema.engine_type_ = resRow["engine_type"];
int store_raw_data = resRow["store_raw_data"];
table_schema.store_raw_data_ = (store_raw_data == 1);
table_schema_array.emplace_back(table_schema);
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING ALL TABLES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING ALL TABLES", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (file_schema.date_ == EmptyDate) {
file_schema.date_ = Meta::GetDate();
}
TableSchema table_schema;
table_schema.table_id_ = file_schema.table_id_;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
try {
MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.file_type_ = TableFileSchema::NEW;
file_schema.dimension_ = table_schema.dimension_;
file_schema.size_ = 0;
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema);
std::string id = "NULL"; //auto-increment
std::string table_id = file_schema.table_id_;
std::string engine_type = std::to_string(file_schema.engine_type_);
std::string file_id = file_schema.file_id_;
std::string file_type = std::to_string(file_schema.file_type_);
std::string size = std::to_string(file_schema.size_);
std::string updated_time = std::to_string(file_schema.updated_time_);
std::string created_on = std::to_string(file_schema.created_on_);
std::string date = std::to_string(file_schema.date_);
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query createTableFileQuery = connectionPtr->query();
createTableFileQuery << "INSERT INTO TableFiles VALUES" <<
"(" << id << ", " << quote << table_id << ", " << engine_type << ", " <<
quote << file_id << ", " << file_type << ", " << size << ", " <<
updated_time << ", " << created_on << ", " << date << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();
}
if (SimpleResult res = createTableFileQuery.execute()) {
file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
//Consume all results to avoid "Commands out of sync" error
// while (createTableFileQuery.more_results()) {
// createTableFileQuery.store_next();
// }
} else {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE";
return Status::DBTransactionError("Add file Error", createTableFileQuery.error());
}
} // Scoped Connection
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE FILE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what());
} catch (std::exception& ex) {
return HandleException("Encounter exception when create table file", ex);
}
return Status::OK();
}
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
files.clear();
try {
MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query filesToIndexQuery = connectionPtr->query();
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
"WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
}
res = filesToIndexQuery.store();
} //Scoped Connection
std::map<std::string, TableSchema> groups;
TableFileSchema table_file;
for (auto& resRow : res) {
table_file.id_ = resRow["id"]; //implicit conversion
std::string table_id;
resRow["table_id"].to_string(table_id);
table_file.table_id_ = table_id;
table_file.engine_type_ = resRow["engine_type"];
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
table_file.file_type_ = resRow["file_type"];
table_file.size_ = resRow["size"];
table_file.date_ = resRow["date"];
auto groupItr = groups.find(table_file.table_id_);
if (groupItr == groups.end()) {
TableSchema table_schema;
table_schema.table_id_ = table_file.table_id_;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
groups[table_file.table_id_] = table_schema;
// std::cout << table_schema.dimension_ << std::endl;
}
table_file.dimension_ = groups[table_file.table_id_].dimension_;
GetTableFilePath(table_file);
files.push_back(table_file);
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO INDEX", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
files.clear();
try {
MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
if (partition.empty()) {
Query filesToSearchQuery = connectionPtr->query();
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
}
res = filesToSearchQuery.store();
} else {
Query filesToSearchQuery = connectionPtr->query();
std::stringstream partitionListSS;
for (auto &date : partition) {
partitionListSS << std::to_string(date) << ", ";
}
std::string partitionListStr = partitionListSS.str();
partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"date IN (" << partitionListStr << ") AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
}
res = filesToSearchQuery.store();
}
} //Scoped Connection
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto& resRow : res) {
table_file.id_ = resRow["id"]; //implicit conversion
std::string table_id_str;
resRow["table_id"].to_string(table_id_str);
table_file.table_id_ = table_id_str;
table_file.engine_type_ = resRow["engine_type"];
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
table_file.file_type_ = resRow["file_type"];
table_file.size_ = resRow["size"];
table_file.date_ = resRow["date"];
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
}
files[table_file.date_].push_back(table_file);
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
DatePartionedTableFilesSchema &files) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
files.clear();
try {
MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query filesToMergeQuery = connectionPtr->query();
filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
"ORDER BY size DESC" << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
}
res = filesToMergeQuery.store();
} //Scoped Connection
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto& resRow : res) {
table_file.id_ = resRow["id"]; //implicit conversion
std::string table_id_str;
resRow["table_id"].to_string(table_id_str);
table_file.table_id_ = table_id_str;
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
table_file.file_type_ = resRow["file_type"];
table_file.size_ = resRow["size"];
table_file.date_ = resRow["date"];
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
}
files[table_file.date_].push_back(table_file);
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO MERGE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (ids.empty()) {
return Status::OK();
}
std::stringstream idSS;
for (auto& id : ids) {
idSS << "id = " << std::to_string(id) << " OR ";
}
std::string idStr = idSS.str();
idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
try {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query getTableFileQuery = connectionPtr->query();
getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(" << idStr << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();
}
res = getTableFileQuery.store();
} //Scoped Connection
assert(res);
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
for (auto& resRow : res) {
TableFileSchema file_schema;
file_schema.table_id_ = table_id;
file_schema.engine_type_ = resRow["engine_type"];
std::string file_id;
resRow["file_id"].to_string(file_id);
file_schema.file_id_ = file_id;
file_schema.file_type_ = resRow["file_type"];
file_schema.size_ = resRow["size"];
file_schema.date_ = resRow["date"];
file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema);
table_files.emplace_back(file_schema);
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILES", er.what());
}
return Status::OK();
}
// PXU TODO: Support Swap
Status MySQLMetaImpl::Archive() {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
auto &criterias = options_.archive_conf.GetCriterias();
if (criterias.empty()) {
return Status::OK();
}
for (auto& kv : criterias) {
auto &criteria = kv.first;
auto &limit = kv.second;
if (criteria == "days") {
size_t usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query archiveQuery = connectionPtr->query();
archiveQuery << "UPDATE TableFiles " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
"WHERE created_on < " << std::to_string(now - usecs) << " AND " <<
"file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();
}
if (!archiveQuery.exec()) {
return Status::DBTransactionError("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DURING ARCHIVE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DURING ARCHIVE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DURING ARCHIVE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DURING ARCHIVE", er.what());
}
}
if (criteria == "disk") {
uint64_t sum = 0;
Size(sum);
auto to_delete = (sum - limit * G);
DiscardFiles(to_delete);
}
}
return Status::OK();
}
Status MySQLMetaImpl::Size(uint64_t &result) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
result = 0;
try {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query getSizeQuery = connectionPtr->query();
getSizeQuery << "SELECT IFNULL(SUM(size),0) AS sum " <<
"FROM TableFiles " <<
"WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
}
res = getSizeQuery.store();
} //Scoped Connection
assert(res && res.num_rows() == 1);
// if (!res) {
//// std::cout << "result is NULL" << std::endl;
// return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", getSizeQuery.error());
// }
if (res.empty()) {
result = 0;
// std::cout << "result = 0" << std::endl;
}
else {
result = res[0]["sum"];
// std::cout << "result = " << std::to_string(result) << std::endl;
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING SIZE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING SIZE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING SIZE", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (to_discard_size <= 0) {
// std::cout << "in" << std::endl;
return Status::OK();
}
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
bool status;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query discardFilesQuery = connectionPtr->query();
discardFilesQuery << "SELECT id, size " <<
"FROM TableFiles " <<
"WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
"ORDER BY id ASC " <<
"LIMIT 10;";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
}
// std::cout << discardFilesQuery.str() << std::endl;
StoreQueryResult res = discardFilesQuery.store();
assert(res);
if (res.num_rows() == 0) {
return Status::OK();
}
TableFileSchema table_file;
std::stringstream idsToDiscardSS;
for (auto &resRow : res) {
if (to_discard_size <= 0) {
break;
}
table_file.id_ = resRow["id"];
table_file.size_ = resRow["size"];
idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
<< " table_file.size=" << table_file.size_;
to_discard_size -= table_file.size_;
}
std::string idsToDiscardStr = idsToDiscardSS.str();
idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR "
discardFilesQuery << "UPDATE TableFiles " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
"updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
"WHERE " << idsToDiscardStr << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
}
status = discardFilesQuery.exec();
if (!status) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES";
return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
}
} //Scoped Connection
return DiscardFiles(to_discard_size);
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DISCARDING FILES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DISCARDING FILES", er.what());
}
}
//ZR: this function assumes all fields in file_schema have value
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query updateTableFileQuery = connectionPtr->query();
//if the table has been deleted, just mark the table file as TO_DELETE
//clean thread will delete the file later
updateTableFileQuery << "SELECT state FROM Tables " <<
"WHERE table_id = " << quote << file_schema.table_id_ << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
}
StoreQueryResult res = updateTableFileQuery.store();
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
int state = res[0]["state"];
if (state == TableSchema::TO_DELETE) {
file_schema.file_type_ = TableFileSchema::TO_DELETE;
}
} else {
file_schema.file_type_ = TableFileSchema::TO_DELETE;
}
std::string id = std::to_string(file_schema.id_);
std::string table_id = file_schema.table_id_;
std::string engine_type = std::to_string(file_schema.engine_type_);
std::string file_id = file_schema.file_id_;
std::string file_type = std::to_string(file_schema.file_type_);
std::string size = std::to_string(file_schema.size_);
std::string updated_time = std::to_string(file_schema.updated_time_);
std::string created_on = std::to_string(file_schema.created_on_);
std::string date = std::to_string(file_schema.date_);
updateTableFileQuery << "UPDATE TableFiles " <<
"SET table_id = " << quote << table_id << ", " <<
"engine_type = " << engine_type << ", " <<
"file_id = " << quote << file_id << ", " <<
"file_type = " << file_type << ", " <<
"size = " << size << ", " <<
"updated_time = " << updated_time << ", " <<
"created_on = " << created_on << ", " <<
"date = " << date << " " <<
"WHERE id = " << id << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
}
// std::cout << updateTableFileQuery.str() << std::endl;
if (!updateTableFileQuery.exec()) {
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE";
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE",
updateTableFileQuery.error());
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILE", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query updateTableFilesQuery = connectionPtr->query();
std::map<std::string, bool> has_tables;
for (auto &file_schema : files) {
if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
continue;
}
updateTableFilesQuery << "SELECT EXISTS " <<
"(SELECT 1 FROM Tables " <<
"WHERE table_id = " << quote << file_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
"AS " << quote << "check" << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
}
StoreQueryResult res = updateTableFilesQuery.store();
assert(res && res.num_rows() == 1);
int check = res[0]["check"];
has_tables[file_schema.table_id_] = (check == 1);
}
for (auto &file_schema : files) {
if (!has_tables[file_schema.table_id_]) {
file_schema.file_type_ = TableFileSchema::TO_DELETE;
}
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
std::string id = std::to_string(file_schema.id_);
std::string table_id = file_schema.table_id_;
std::string engine_type = std::to_string(file_schema.engine_type_);
std::string file_id = file_schema.file_id_;
std::string file_type = std::to_string(file_schema.file_type_);
std::string size = std::to_string(file_schema.size_);
std::string updated_time = std::to_string(file_schema.updated_time_);
std::string created_on = std::to_string(file_schema.created_on_);
std::string date = std::to_string(file_schema.date_);
updateTableFilesQuery << "UPDATE TableFiles " <<
"SET table_id = " << quote << table_id << ", " <<
"engine_type = " << engine_type << ", " <<
"file_id = " << quote << file_id << ", " <<
"file_type = " << file_type << ", " <<
"size = " << size << ", " <<
"updated_time = " << updated_time << ", " <<
"created_on = " << created_on << ", " <<
"date = " << date << " " <<
"WHERE id = " << id << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
}
if (!updateTableFilesQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES";
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES",
updateTableFilesQuery.error());
}
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
// static int b_count = 0;
// b_count++;
// std::cout << "CleanUpFilesWithTTL: " << b_count << std::endl;
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
auto now = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
{
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use before creating ScopedConnection = "
// << mysql_connection_pool_->getConnectionsInUse();
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = "
// << mysql_connection_pool_->getConnectionsInUse();
// }
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
"FROM TableFiles " <<
"WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " <<
"updated_time < " << std::to_string(now - seconds * US_PS) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
assert(res);
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
for (auto &resRow : res) {
table_file.id_ = resRow["id"]; //implicit conversion
std::string table_id;
resRow["table_id"].to_string(table_id);
table_file.table_id_ = table_id;
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
table_file.date_ = resRow["date"];
GetTableFilePath(table_file);
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = "
<< table_file.location_ << std::endl;
boost::filesystem::remove(table_file.location_);
idsToDelete.emplace_back(std::to_string(table_file.id_));
}
if (!idsToDelete.empty()) {
std::stringstream idsToDeleteSS;
for (auto &id : idsToDelete) {
idsToDeleteSS << "id = " << id << " OR ";
}
std::string idsToDeleteStr = idsToDeleteSS.str();
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " <<
idsToDeleteStr << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
if (!cleanUpFilesWithTTLQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL";
return Status::DBTransactionError("CleanUpFilesWithTTL Error",
cleanUpFilesWithTTLQuery.error());
}
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
}
try {
MetricCollector metric;
{
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use before creating ScopedConnection = "
// << mysql_connection_pool_->getConnectionsInUse();
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = "
// << mysql_connection_pool_->getConnectionsInUse();
// }
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
"FROM Tables " <<
"WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
assert(res);
// std::cout << res.num_rows() << std::endl;
if (!res.empty()) {
std::stringstream idsToDeleteSS;
for (auto &resRow : res) {
size_t id = resRow["id"];
std::string table_id;
resRow["table_id"].to_string(table_id);
auto table_path = GetTablePath(table_id);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
boost::filesystem::remove_all(table_path);
idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
}
std::string idsToDeleteStr = idsToDeleteSS.str();
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " <<
idsToDeleteStr << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
if (!cleanUpFilesWithTTLQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL";
return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL",
cleanUpFilesWithTTLQuery.error());
}
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::CleanUp() {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
Query cleanUpQuery = connectionPtr->query();
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
}
if (!cleanUpQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES";
return Status::DBTransactionError("Clean up Error", cleanUpQuery.error());
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
MetricCollector metric;
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query countQuery = connectionPtr->query();
countQuery << "SELECT size " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
}
res = countQuery.store();
} //Scoped Connection
result = 0;
for (auto &resRow : res) {
size_t size = resRow["size"];
result += size;
}
assert(table_schema.dimension_ != 0);
result /= table_schema.dimension_;
result /= sizeof(float);
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING COUNT" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING COUNT", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING COUNT" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING COUNT", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::DropAll() {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (boost::filesystem::is_directory(options_.path)) {
boost::filesystem::remove_all(options_.path);
}
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query dropTableQuery = connectionPtr->query();
dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();
}
if (dropTableQuery.exec()) {
return Status::OK();
}
else {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE";
return Status::DBTransactionError("DROP TABLE ERROR", dropTableQuery.error());
}
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN DROPPING TABLE", er.what());
} catch (const Exception& er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING TABLE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what());
}
return Status::OK();
}
MySQLMetaImpl::~MySQLMetaImpl() {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
CleanUp();
}
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Meta.h"
#include "Options.h"
#include "MySQLConnectionPool.h"
#include "mysql++/mysql++.h"
#include <mutex>
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
// auto StoragePrototype(const std::string& path);
using namespace mysqlpp;
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions& options_, const int& mode);
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) override;
virtual Status DeleteTable(const std::string& table_id) override;
virtual Status DeleteTableFiles(const std::string& table_id) override;
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
virtual Status GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedTableFilesSchema& files) override;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) override;
virtual Status FilesToIndex(TableFilesSchema&) override;
virtual Status Archive() override;
virtual Status Size(uint64_t& result) override;
virtual Status CleanUp() override;
virtual Status CleanUpFilesWithTTL(uint16_t seconds) override;
virtual Status DropAll() override;
virtual Status Count(const std::string& table_id, uint64_t& result) override;
virtual ~MySQLMetaImpl();
private:
Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id);
Status DiscardFiles(long long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
const DBMetaOptions options_;
const int mode_;
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab = false;
// std::mutex connectionMutex_;
}; // DBMetaImpl
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -45,15 +45,23 @@ struct DBMetaOptions {
std::string path;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
bool sql_echo = false;
}; // DBMetaOptions
struct Options {
typedef enum {
SINGLE,
CLUSTER,
READ_ONLY
} MODE;
Options();
uint16_t memory_sync_interval = 1; //unit: second
uint16_t merge_trigger_number = 2;
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
int mode = MODE::SINGLE;
}; // Options
......
......@@ -23,6 +23,33 @@ DBWrapper::DBWrapper() {
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
std::string sql_echo = config.GetValue(CONFIG_DB_SQL_ECHO, "off");
if (sql_echo == "on") {
opt.meta.sql_echo = true;
}
else if (sql_echo == "off") {
opt.meta.sql_echo = false;
}
else {
std::cout << "ERROR: sql_echo specified in db_config is not one of ['on', 'off']" << std::endl;
kill(0, SIGUSR1);
}
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
if (mode == "single") {
opt.mode = zilliz::milvus::engine::Options::MODE::SINGLE;
}
else if (mode == "cluster") {
opt.mode = zilliz::milvus::engine::Options::MODE::CLUSTER;
}
else if (mode == "read_only") {
opt.mode = zilliz::milvus::engine::Options::MODE::READ_ONLY;
}
else {
std::cout << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl;
kill(0, SIGUSR1);
}
//set archive config
engine::ArchiveConf::CriteriaT criterial;
......
......@@ -19,6 +19,7 @@ static const std::string CONFIG_SERVER_ADDRESS = "address";
static const std::string CONFIG_SERVER_PORT = "port";
static const std::string CONFIG_SERVER_PROTOCOL = "transfer_protocol";
static const std::string CONFIG_SERVER_MODE = "server_mode";
static const std::string CONFIG_CLUSTER_MODE = "mode";
static const std::string CONFIG_DB = "db_config";
static const std::string CONFIG_DB_URL = "db_backend_url";
......@@ -26,6 +27,7 @@ static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
static const std::string CONFIG_DB_SQL_ECHO = "sql_echo";
static const std::string CONFIG_LOG = "log_config";
......
......@@ -7,6 +7,7 @@ GTEST_VERSION=1.8.1
JSONCONS_VERSION=0.126.0
LAPACK_VERSION=v3.8.0
LZ4_VERSION=v1.9.1
MYSQLPP_VERSION=3.2.4
OPENBLAS_VERSION=v0.3.6
PROMETHEUS_VERSION=v0.7.0
ROCKSDB_VERSION=v6.0.2
......
......@@ -5,15 +5,11 @@
#-------------------------------------------------------------------------------
link_directories(
"${CMAKE_BINARY_DIR}/lib"
"${GTEST_PREFIX}/lib/"
)
aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
message(STATUS "GTEST LIB: ${GTEST_PREFIX}/lib")
set(unittest_srcs
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp)
#${EASYLOGGINGPP_INCLUDE_DIR}/easylogging++.cc)
......
......@@ -21,10 +21,10 @@ set(db_scheduler_srcs
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
include_directories(/usr/include/mysql)
set(db_test_src
${unittest_srcs}
#${unittest_srcs}
${config_files}
${cache_srcs}
${db_srcs}
......@@ -44,6 +44,7 @@ set(db_libs
boost_system
boost_filesystem
lz4
mysqlpp
)
target_link_libraries(db_test ${db_libs} ${unittest_libs})
......
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include <stdlib.h>
#include <time.h>
#include "utils.h"
#include "db/MySQLMetaImpl.h"
#include "db/Factories.h"
#include "db/Utils.h"
#include "db/MetaConsts.h"
#include "mysql++/mysql++.h"
#include <iostream>
using namespace zilliz::milvus::engine;
//TEST_F(MySQLTest, InitializeTest) {
// DBMetaOptions options;
// //dialect+driver://username:password@host:port/database
// options.backend_uri = "mysql://root:1234@:/test";
// meta::MySQLMetaImpl impl(options);
// auto status = impl.Initialize();
// std::cout << status.ToString() << std::endl;
// ASSERT_TRUE(status.ok());
//}
TEST_F(MySQLTest, core) {
// DBMetaOptions options;
// //dialect+driver://username:password@host:port/database
// options.backend_uri = "mysql://root:1234@:/test";
// options.path = "/tmp/vecwise_test";
int mode = Options::MODE::SINGLE;
meta::MySQLMetaImpl impl(getDBMetaOptions(), mode);
// auto status = impl.Initialize();
// ASSERT_TRUE(status.ok());
meta::TableSchema schema1;
schema1.table_id_ = "test1";
schema1.dimension_ = 123;
auto status = impl.CreateTable(schema1);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
meta::TableSchema schema2;
schema2.table_id_ = "test2";
schema2.dimension_ = 321;
status = impl.CreateTable(schema2);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
status = impl.CreateTable(schema2);
// std::cout << status.ToString() << std::endl;
// ASSERT_THROW(impl.CreateTable(schema), mysqlpp::BadQuery);
ASSERT_TRUE(status.ok());
status = impl.DeleteTable(schema2.table_id_);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
size_t id1 = schema1.id_;
long created_on1 = schema1.created_on_;
status = impl.DescribeTable(schema1);
ASSERT_TRUE(status.ok());
ASSERT_EQ(schema1.id_, id1);
ASSERT_EQ(schema1.table_id_, "test1");
ASSERT_EQ(schema1.created_on_, created_on1);
ASSERT_EQ(schema1.files_cnt_, 0);
ASSERT_EQ(schema1.engine_type_, 1);
ASSERT_EQ(schema1.store_raw_data_, false);
bool check;
status = impl.HasTable("test1", check);
ASSERT_TRUE(status.ok());
ASSERT_EQ(check, true);
std::vector<meta::TableSchema> table_schema_array;
status = impl.AllTables(table_schema_array);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_schema_array.size(), 1);
meta::TableSchema resultSchema = table_schema_array[0];
ASSERT_EQ(resultSchema.id_, id1);
ASSERT_EQ(resultSchema.table_id_, "test1");
ASSERT_EQ(resultSchema.dimension_, 123);
ASSERT_EQ(resultSchema.files_cnt_, 0);
ASSERT_EQ(resultSchema.engine_type_, 1);
ASSERT_EQ(resultSchema.store_raw_data_, false);
meta::TableFileSchema tableFileSchema;
tableFileSchema.table_id_ = "test1";
status = impl.CreateTableFile(tableFileSchema);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
tableFileSchema.file_type_ = meta::TableFileSchema::TO_INDEX;
status = impl.UpdateTableFile(tableFileSchema);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
meta::TableFilesSchema filesToIndex;
status = impl.FilesToIndex(filesToIndex);
ASSERT_TRUE(status.ok());
ASSERT_EQ(filesToIndex.size(), 1);
meta::TableFileSchema fileToIndex = filesToIndex[0];
ASSERT_EQ(fileToIndex.table_id_, "test1");
ASSERT_EQ(fileToIndex.dimension_, 123);
// meta::TableFilesSchema filesToIndex;
// status = impl.FilesToIndex(filesToIndex);
// ASSERT_TRUE(status.ok());
// ASSERT_EQ(filesToIndex.size(), 0);
meta::DatesT partition;
partition.push_back(tableFileSchema.date_);
meta::DatePartionedTableFilesSchema filesToSearch;
status = impl.FilesToSearch(tableFileSchema.table_id_, partition, filesToSearch);
ASSERT_TRUE(status.ok());
ASSERT_EQ(filesToSearch.size(), 1);
ASSERT_EQ(filesToSearch[tableFileSchema.date_].size(), 1);
meta::TableFileSchema fileToSearch = filesToSearch[tableFileSchema.date_][0];
ASSERT_EQ(fileToSearch.table_id_, "test1");
ASSERT_EQ(fileToSearch.dimension_, 123);
tableFileSchema.file_type_ = meta::TableFileSchema::RAW;
status = impl.UpdateTableFile(tableFileSchema);
ASSERT_TRUE(status.ok());
meta::DatePartionedTableFilesSchema filesToMerge;
status = impl.FilesToMerge(tableFileSchema.table_id_, filesToMerge);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
ASSERT_EQ(filesToMerge.size(), 1);
ASSERT_EQ(filesToMerge[tableFileSchema.date_].size(), 1);
meta::TableFileSchema fileToMerge = filesToMerge[tableFileSchema.date_][0];
ASSERT_EQ(fileToMerge.table_id_, "test1");
ASSERT_EQ(fileToMerge.dimension_, 123);
meta::TableFilesSchema resultTableFilesSchema;
std::vector<size_t> ids;
ids.push_back(tableFileSchema.id_);
status = impl.GetTableFiles(tableFileSchema.table_id_, ids, resultTableFilesSchema);
ASSERT_TRUE(status.ok());
ASSERT_EQ(resultTableFilesSchema.size(), 1);
meta::TableFileSchema resultTableFileSchema = resultTableFilesSchema[0];
// ASSERT_EQ(resultTableFileSchema.id_, tableFileSchema.id_);
ASSERT_EQ(resultTableFileSchema.table_id_, tableFileSchema.table_id_);
ASSERT_EQ(resultTableFileSchema.file_id_, tableFileSchema.file_id_);
ASSERT_EQ(resultTableFileSchema.file_type_, tableFileSchema.file_type_);
ASSERT_EQ(resultTableFileSchema.size_, tableFileSchema.size_);
ASSERT_EQ(resultTableFileSchema.date_, tableFileSchema.date_);
ASSERT_EQ(resultTableFileSchema.engine_type_, tableFileSchema.engine_type_);
ASSERT_EQ(resultTableFileSchema.dimension_, tableFileSchema.dimension_);
tableFileSchema.size_ = 234;
meta::TableSchema schema3;
schema3.table_id_ = "test3";
schema3.dimension_ = 321;
status = impl.CreateTable(schema3);
ASSERT_TRUE(status.ok());
meta::TableFileSchema tableFileSchema2;
tableFileSchema2.table_id_ = "test3";
tableFileSchema2.size_ = 345;
status = impl.CreateTableFile(tableFileSchema2);
ASSERT_TRUE(status.ok());
meta::TableFilesSchema filesToUpdate;
filesToUpdate.emplace_back(tableFileSchema);
filesToUpdate.emplace_back(tableFileSchema2);
status = impl.UpdateTableFile(tableFileSchema);
ASSERT_TRUE(status.ok());
uint64_t resultSize;
status = impl.Size(resultSize);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
ASSERT_EQ(resultSize, tableFileSchema.size_ + tableFileSchema2.size_);
uint64_t countResult;
status = impl.Count(tableFileSchema.table_id_, countResult);
ASSERT_TRUE(status.ok());
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MySQLTest, GROUP_TEST) {
int mode = Options::MODE::SINGLE;
meta::MySQLMetaImpl impl(getDBMetaOptions(), mode);
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl.CreateTable(group);
ASSERT_TRUE(status.ok());
auto gid = group.id_;
group.id_ = -1;
status = impl.DescribeTable(group);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group.id_, gid);
ASSERT_EQ(group.table_id_, table_id);
group.table_id_ = "not_found";
status = impl.DescribeTable(group);
ASSERT_TRUE(!status.ok());
group.table_id_ = table_id;
status = impl.CreateTable(group);
ASSERT_TRUE(status.ok());
group.table_id_ = "";
status = impl.CreateTable(group);
ASSERT_TRUE(status.ok());
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MySQLTest, table_file_TEST) {
int mode = Options::MODE::SINGLE;
meta::MySQLMetaImpl impl(getDBMetaOptions(), mode);
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
group.dimension_ = 256;
auto status = impl.CreateTable(group);
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
status = impl.CreateTableFile(table_file);
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW);
uint64_t cnt = 0;
status = impl.Count(table_id, cnt);
ASSERT_TRUE(status.ok());
ASSERT_EQ(cnt, 0UL);
auto file_id = table_file.file_id_;
auto new_file_type = meta::TableFileSchema::INDEX;
table_file.file_type_ = new_file_type;
status = impl.UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.file_type_, new_file_type);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_FALSE(status.ok());
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
}
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_TRUE(status.ok());
table_file.date_ = meta::Meta::GetDateWithDelta(-2);
status = impl.UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2));
ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE);
dates.clear();
dates.push_back(table_file.date_);
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_TRUE(status.ok());
std::vector<size_t> ids = {table_file.id_};
meta::TableFilesSchema files;
status = impl.GetTableFiles(table_file.table_id_, ids, files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), 1UL);
ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE);
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MySQLTest, ARCHIVE_TEST_DAYS) {
srand(time(0));
DBMetaOptions options = getDBMetaOptions();
int days_num = rand() % 100;
std::stringstream ss;
ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str());
int mode = Options::MODE::SINGLE;
meta::MySQLMetaImpl impl(options, mode);
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl.CreateTable(group);
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
auto cnt = 100;
long ts = utils::GetMicroSecTimeStamp();
std::vector<int> days;
std::vector<size_t> ids;
for (auto i=0; i<cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
int day = rand() % (days_num*2);
table_file.created_on_ = ts - day*meta::D_SEC*meta::US_PS - 10000;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
days.push_back(day);
ids.push_back(table_file.id_);
}
impl.Archive();
int i = 0;
meta::TableFilesSchema files_get;
status = impl.GetTableFiles(table_file.table_id_, ids, files_get);
ASSERT_TRUE(status.ok());
for(auto& file : files_get) {
if (days[i] < days_num) {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
} else {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::TO_DELETE);
}
i++;
}
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MySQLTest, ARCHIVE_TEST_DISK) {
DBMetaOptions options = getDBMetaOptions();
options.archive_conf = ArchiveConf("delete", "disk:11");
int mode = Options::MODE::SINGLE;
auto impl = meta::MySQLMetaImpl(options, mode);
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl.CreateTable(group);
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
auto cnt = 10;
auto each_size = 2UL;
std::vector<size_t> ids;
for (auto i=0; i<cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
table_file.size_ = each_size * meta::G;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
ids.push_back(table_file.id_);
}
impl.Archive();
int i = 0;
meta::TableFilesSchema files_get;
status = impl.GetTableFiles(table_file.table_id_, ids, files_get);
ASSERT_TRUE(status.ok());
for(auto& file : files_get) {
if (i < 5) {
ASSERT_TRUE(file.file_type_ == meta::TableFileSchema::TO_DELETE);
} else {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
}
++i;
}
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MySQLTest, TABLE_FILES_TEST) {
int mode = Options::MODE::SINGLE;
auto impl = meta::MySQLMetaImpl(getDBMetaOptions(), mode);
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl.CreateTable(group);
int new_files_cnt = 4;
int raw_files_cnt = 5;
int to_index_files_cnt = 6;
int index_files_cnt = 7;
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
for (auto i=0; i<new_files_cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
status = impl.UpdateTableFile(table_file);
}
for (auto i=0; i<raw_files_cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::RAW;
status = impl.UpdateTableFile(table_file);
}
for (auto i=0; i<to_index_files_cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
status = impl.UpdateTableFile(table_file);
}
for (auto i=0; i<index_files_cnt; ++i) {
status = impl.CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::INDEX;
status = impl.UpdateTableFile(table_file);
}
meta::TableFilesSchema files;
status = impl.FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatePartionedTableFilesSchema dated_files;
status = impl.FilesToMerge(group.table_id_, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt);
status = impl.FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {table_file.date_};
status = impl.FilesToSearch(table_id, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
......@@ -12,23 +12,24 @@
#include "db/DB.h"
#include "db/DBImpl.h"
#include "db/MetaConsts.h"
#include "db/Factories.h"
using namespace zilliz::milvus;
namespace {
static const std::string TABLE_NAME = "test_group";
static constexpr int64_t TABLE_DIM = 256;
static const std::string TABLE_NAME = "test_group";
static constexpr int64_t TABLE_DIM = 256;
engine::meta::TableSchema BuildTableSchema() {
engine::meta::TableSchema BuildTableSchema() {
engine::meta::TableSchema table_info;
table_info.dimension_ = TABLE_DIM;
table_info.table_id_ = TABLE_NAME;
table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP;
return table_info;
}
}
void BuildVectors(int64_t n, std::vector<float>& vectors) {
void BuildVectors(int64_t n, std::vector<float>& vectors) {
vectors.clear();
vectors.resize(n*TABLE_DIM);
float* data = vectors.data();
......@@ -36,7 +37,7 @@ void BuildVectors(int64_t n, std::vector<float>& vectors) {
for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48();
data[TABLE_DIM * i] += i / 2000.;
}
}
}
}
......@@ -292,3 +293,251 @@ TEST_F(DBTest2, DELETE_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
};
TEST_F(MySQLDBTest, DB_TEST) {
auto options = GetOptions();
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
int64_t nb = 50;
std::vector<float> xb;
BuildVectors(nb, xb);
int64_t qb = 5;
std::vector<float> qxb;
BuildVectors(qb, qxb);
std::thread search([&]() {
engine::QueryResults results;
int k = 10;
std::this_thread::sleep_for(std::chrono::seconds(2));
INIT_TIMER;
std::stringstream ss;
uint64_t count = 0;
uint64_t prev_count = 0;
for (auto j=0; j<10; ++j) {
ss.str("");
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results);
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
for (auto k=0; k<qb; ++k) {
ASSERT_EQ(results[k][0].first, target_ids[k]);
ss.str("");
ss << "Result [" << k << "]:";
for (auto result : results[k]) {
ss << result.first << " ";
}
/* LOG(DEBUG) << ss.str(); */
}
ASSERT_TRUE(count >= prev_count);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
int loop = 100000;
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->InsertVectors(TABLE_NAME, qb, qxb.data(), target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
search.join();
delete db_;
auto dummyDB = engine::DBFactory::Build(options);
dummyDB->DropAll();
delete dummyDB;
};
TEST_F(MySQLDBTest, SEARCH_TEST) {
auto options = GetOptions();
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
// prepare raw data
size_t nb = 250000;
size_t nq = 10;
size_t k = 5;
std::vector<float> xb(nb*TABLE_DIM);
std::vector<float> xq(nq*TABLE_DIM);
std::vector<long> ids(nb);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
for (size_t i = 0; i < nb*TABLE_DIM; i++) {
xb[i] = dis_xt(gen);
if (i < nb){
ids[i] = i;
}
}
for (size_t i = 0; i < nq*TABLE_DIM; i++) {
xq[i] = dis_xt(gen);
}
// result data
//std::vector<long> nns_gt(k*nq);
std::vector<long> nns(k*nq); // nns = nearst neg search
//std::vector<float> dis_gt(k*nq);
std::vector<float> dis(k*nq);
// insert data
const int batch_size = 100;
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->InsertVectors(TABLE_NAME, batch_size, xb.data()+batch_size*j*TABLE_DIM, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
}
sleep(2); // wait until build index finish
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results);
ASSERT_STATS(stat);
delete db_;
auto dummyDB = engine::DBFactory::Build(options);
dummyDB->DropAll();
delete dummyDB;
// TODO(linxj): add groundTruth assert
};
TEST_F(MySQLDBTest, ARHIVE_DISK_CHECK) {
auto options = GetOptions();
options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1");
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
std::vector<engine::meta::TableSchema> table_schema_array;
stat = db_->AllTables(table_schema_array);
ASSERT_STATS(stat);
bool bfound = false;
for(auto& schema : table_schema_array) {
if(schema.table_id_ == TABLE_NAME) {
bfound = true;
break;
}
}
ASSERT_TRUE(bfound);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
uint64_t size;
db_->Size(size);
int64_t nb = 10;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_LE(size, 1 * engine::meta::G);
delete db_;
auto dummyDB = engine::DBFactory::Build(options);
dummyDB->DropAll();
delete dummyDB;
};
TEST_F(MySQLDBTest, DELETE_TEST) {
auto options = GetOptions();
options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1");
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
// std::cout << stat.ToString() << std::endl;
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
// std::cout << "location: " << table_info_get.location_ << std::endl;
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_));
engine::IDNumbers vector_ids;
uint64_t size;
db_->Size(size);
int64_t nb = 100000;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 20;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::vector<engine::meta::DateT> dates;
stat = db_->DeleteTable(TABLE_NAME, dates);
// std::cout << "5 sec start" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
// std::cout << "5 sec finish" << std::endl;
ASSERT_TRUE(stat.ok());
// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
delete db_;
auto dummyDB = engine::DBFactory::Build(options);
dummyDB->DropAll();
delete dummyDB;
};
......@@ -11,9 +11,29 @@
#include "utils.h"
#include "db/Factories.h"
#include "db/Options.h"
INITIALIZE_EASYLOGGINGPP
using namespace zilliz::milvus;
static std::string uri;
class DBTestEnvironment : public ::testing::Environment {
public:
// explicit DBTestEnvironment(std::string uri) : uri_(uri) {}
static std::string getURI() {
return uri;
}
void SetUp() override {
getURI();
}
};
void ASSERT_STATS(engine::Status& stat) {
ASSERT_TRUE(stat.ok());
if(!stat.ok()) {
......@@ -21,6 +41,7 @@ void ASSERT_STATS(engine::Status& stat) {
}
}
void DBTest::InitLog() {
el::Configurations defaultConf;
defaultConf.setToDefault();
......@@ -32,6 +53,7 @@ void DBTest::InitLog() {
engine::Options DBTest::GetOptions() {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
return options;
}
......@@ -50,6 +72,7 @@ engine::Options DBTest2::GetOptions() {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1");
options.meta.backend_uri = "sqlite://:@:/";
return options;
}
......@@ -61,3 +84,29 @@ void MetaTest::SetUp() {
void MetaTest::TearDown() {
impl_->DropAll();
}
zilliz::milvus::engine::DBMetaOptions MySQLTest::getDBMetaOptions() {
// std::string path = "/tmp/milvus_test";
// engine::DBMetaOptions options = engine::DBMetaOptionsFactory::Build(path);
zilliz::milvus::engine::DBMetaOptions options;
options.path = "/tmp/milvus_test";
options.backend_uri = DBTestEnvironment::getURI();
return options;
}
zilliz::milvus::engine::Options MySQLDBTest::GetOptions() {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = DBTestEnvironment::getURI();
return options;
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
if (argc > 1) {
uri = argv[1];
}
// std::cout << uri << std::endl;
::testing::AddGlobalTestEnvironment(new DBTestEnvironment);
return RUN_ALL_TESTS();
}
......@@ -8,9 +8,11 @@
#include <gtest/gtest.h>
#include <chrono>
//#include <src/db/MySQLMetaImpl.h>
#include "db/DB.h"
#include "db/DBMetaImpl.h"
#include "db/MySQLMetaImpl.h"
#define TIMING
......@@ -28,9 +30,28 @@
#define STOP_TIMER(name)
#endif
void ASSERT_STATS(zilliz::milvus::engine::Status& stat);
//class TestEnv : public ::testing::Environment {
//public:
//
// static std::string getURI() {
// if (const char* uri = std::getenv("MILVUS_DBMETA_URI")) {
// return uri;
// }
// else {
// return "";
// }
// }
//
// void SetUp() override {
// getURI();
// }
//
//};
//
//::testing::Environment* const test_env =
// ::testing::AddGlobalTestEnvironment(new TestEnv);
class DBTest : public ::testing::Test {
protected:
......@@ -55,3 +76,14 @@ protected:
virtual void SetUp() override;
virtual void TearDown() override;
};
class MySQLTest : public ::testing::Test {
protected:
// std::shared_ptr<zilliz::milvus::engine::meta::MySQLMetaImpl> impl_;
zilliz::milvus::engine::DBMetaOptions getDBMetaOptions();
};
class MySQLDBTest : public ::testing::Test {
protected:
zilliz::milvus::engine::Options GetOptions();
};
......@@ -2,11 +2,14 @@ server_config:
address: 0.0.0.0
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
mode: single # milvus deployment type: single, cluster
mode: single # milvus deployment type: single, cluster, read_only
db_config:
db_path: /tmp/milvus # milvus data storage path
db_backend_url: http://127.0.0.1 # meta database uri
#URI format: dialect://username:password@host:port/database
#All parts except dialect are optional, but you MUST include the delimiters
#Currently supports mysql or sqlite
db_backend_url: mysql://root:1234@:/test # meta database uri
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB
archive_days_threshold: 30 # files older than x days will be archived, unit: day
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册