提交 3ccaf540 编写于 作者: Z zhiru

update


Former-commit-id: 8c9ef2450105713ab756e292c47b8c835ea32f13
......@@ -21,6 +21,10 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-92 - Unify behavior of debug and release build
- MS-98 - Install all unit test to installation directory
- MS-115 - Change is_startup of metric_config switch from true to on
- MS-122 - Archive criteria config
- MS-124 - HasTable interface
- MS-126 - Add more error code
## New Feature
- MS-57 - Implement index load/search pipeline
......
......@@ -11,6 +11,8 @@ db_config:
#Currently supports mysql or sqlite
db_backend_url: mysql://root:1234@:/test # meta database uri
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB
archive_days_threshold: 30 # files older than x days will be archived, unit: day
metric_config:
is_startup: off # if monitoring start: on, off
......
......@@ -193,9 +193,11 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
auto table = ConnectorPtr->select(columns(&TableSchema::state_),
where(c(&TableSchema::table_id_) == table_schema.table_id_));
if (table.size() == 1) {
std::string msg = (TableSchema::TO_DELETE == std::get<0>(table[0])) ?
"Table already exists and it is in delete state, please wait a second" : "Table already exists";
return Status::Error(msg);
if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
return Status::Error("Table already exists and it is in delete state, please wait a second");
} else {
return Status::OK();//table already exists, no error
}
}
}
......@@ -329,7 +331,7 @@ Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
}
} catch (std::exception &e) {
HandleException("Encounter exception when lookup table", e);
return HandleException("Encounter exception when lookup table", e);
}
return Status::OK();
......@@ -359,7 +361,7 @@ Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
}
} catch (std::exception &e) {
HandleException("Encounter exception when lookup all tables", e);
return HandleException("Encounter exception when lookup all tables", e);
}
return Status::OK();
......@@ -656,7 +658,7 @@ Status DBMetaImpl::Archive() {
for (auto kv : criterias) {
auto &criteria = kv.first;
auto &limit = kv.second;
if (criteria == "days") {
if (criteria == engine::ARCHIVE_CONF_DAYS) {
long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
try {
......@@ -672,11 +674,11 @@ Status DBMetaImpl::Archive() {
return HandleException("Encounter exception when update table files", e);
}
}
if (criteria == "disk") {
if (criteria == engine::ARCHIVE_CONF_DISK) {
uint64_t sum = 0;
Size(sum);
auto to_delete = (sum - limit * G);
int64_t to_delete = (int64_t)sum - limit * G;
DiscardFiles(to_delete);
}
}
......
......@@ -81,11 +81,11 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
std::string dialect = pieces_match[1].str();
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_DEBUG << "Using MySQL";
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions));
}
else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_DEBUG << "Using SQLite";
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions));
}
else {
......
......@@ -146,11 +146,16 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
Status MemManager::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
MemIdMap temp_map;
for (auto& kv: mem_id_map_) {
if(kv.second->RowCount() == 0) {
temp_map.insert(kv);
continue;//empty vector, no need to serialize
}
immu_mem_list_.push_back(kv.second);
}
mem_id_map_.clear();
mem_id_map_.swap(temp_map);
return Status::OK();
}
......@@ -168,8 +173,21 @@ Status MemManager::Serialize(std::set<std::string>& table_ids) {
}
Status MemManager::EraseMemVector(const std::string& table_id) {
std::unique_lock<std::mutex> lock(mutex_);
mem_id_map_.erase(table_id);
{//erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_);
mem_id_map_.erase(table_id);
}
{//erase MemVector from serialize cache
std::unique_lock<std::mutex> lock(serialization_mtx_);
MemList temp_list;
for (auto& mem : immu_mem_list_) {
if(mem->TableId() != table_id) {
temp_list.push_back(mem);
}
}
immu_mem_list_.swap(temp_list);
}
return Status::OK();
}
......
......@@ -45,6 +45,8 @@ public:
const std::string& Location() const { return schema_.location_; }
std::string TableId() const { return schema_.table_id_; }
private:
MemVectors() = delete;
MemVectors(const MemVectors&) = delete;
......
......@@ -2,6 +2,7 @@
#include <string>
#include <unistd.h>
#include <atomic>
#include "Log.h"
......@@ -53,7 +54,7 @@ public:
// Other half of in-use conn count limit
void release(const mysqlpp::Connection* pc) override {
mysqlpp::ConnectionPool::release(pc);
// ENGINE_LOG_DEBUG << "conns_in_use_ in release: " << conns_in_use_ << std::endl;
if (conns_in_use_ <= 0) {
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl;
}
......@@ -62,6 +63,10 @@ public:
}
}
int getConnectionsInUse() {
return conns_in_use_;
}
void set_max_idle_time(int max_idle) {
maxIdleTime_ = max_idle;
}
......@@ -96,7 +101,7 @@ protected:
private:
// Number of connections currently in use
int conns_in_use_;
std::atomic<int> conns_in_use_;
// Our connection parameters
std::string db_, user_, password_, server_;
......
......@@ -158,13 +158,15 @@ namespace meta {
int maxPoolSize = threadHint == 0 ? 8 : threadHint;
mySQLConnectionPool_ = std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl;
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
try {
CleanUp();
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) {
// return Status::Error("DB connection failed: ", connectionPtr->error());
// }
......@@ -281,6 +283,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query dropPartitionsByDatesQuery = connectionPtr->query();
dropPartitionsByDatesQuery << "UPDATE TableFiles " <<
......@@ -294,7 +300,6 @@ namespace meta {
dropPartitionsByDatesQuery.error());
}
} //Scoped Connection
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what();
......@@ -319,6 +324,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query createTableQuery = connectionPtr->query();
// ENGINE_LOG_DEBUG << "Create Table in";
if (table_schema.table_id_.empty()) {
......@@ -331,11 +340,12 @@ namespace meta {
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
int state = res[0]["state"];
std::string msg = (TableSchema::TO_DELETE == state) ?
"Table already exists and it is in delete state, please wait a second"
: "Table already exists";
ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable: " << msg;
return Status::Error(msg);
if (TableSchema::TO_DELETE == state) {
return Status::Error("Table already exists and it is in delete state, please wait a second");
}
else {
return Status::OK();//table already exists, no error
}
}
}
// ENGINE_LOG_DEBUG << "Create Table start";
......@@ -411,6 +421,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
//soft delete table
Query deleteTableQuery = connectionPtr->query();
//
......@@ -444,6 +458,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
//soft delete table files
Query deleteTableFilesQuery = connectionPtr->query();
//
......@@ -484,6 +502,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query describeTableQuery = connectionPtr->query();
describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " <<
"FROM Tables " <<
......@@ -539,6 +561,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query hasTableQuery = connectionPtr->query();
//since table_id is a unique column we just need to check whether it exists or not
hasTableQuery << "SELECT EXISTS " <<
......@@ -579,6 +605,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query allTablesQuery = connectionPtr->query();
allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " <<
"FROM Tables " <<
......@@ -658,6 +688,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query createTableFileQuery = connectionPtr->query();
createTableFileQuery << "INSERT INTO TableFiles VALUES" <<
......@@ -718,6 +752,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query filesToIndexQuery = connectionPtr->query();
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
......@@ -794,6 +832,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
if (partition.empty()) {
Query filesToSearchQuery = connectionPtr->query();
......@@ -895,6 +937,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query filesToMergeQuery = connectionPtr->query();
filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " <<
"FROM TableFiles " <<
......@@ -980,6 +1026,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query getTableFileQuery = connectionPtr->query();
getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
......@@ -1055,6 +1105,10 @@ namespace meta {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query archiveQuery = connectionPtr->query();
archiveQuery << "UPDATE TableFiles " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
......@@ -1098,6 +1152,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query getSizeQuery = connectionPtr->query();
getSizeQuery << "SELECT SUM(size) AS sum " <<
"FROM TableFiles " <<
......@@ -1151,6 +1209,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query discardFilesQuery = connectionPtr->query();
discardFilesQuery << "SELECT id, size " <<
"FROM TableFiles " <<
......@@ -1220,6 +1282,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query updateTableFileQuery = connectionPtr->query();
//if the table has been deleted, just mark the table file as TO_DELETE
......@@ -1293,6 +1359,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query updateTableFilesQuery = connectionPtr->query();
std::map<std::string, bool> has_tables;
......@@ -1373,8 +1443,17 @@ namespace meta {
MetricCollector metric;
{
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use before creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
// }
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
"FROM TableFiles " <<
......@@ -1443,8 +1522,16 @@ namespace meta {
MetricCollector metric;
{
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use before creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
// }
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
"FROM Tables " <<
......@@ -1500,6 +1587,10 @@ namespace meta {
try {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
Query cleanUpQuery = connectionPtr->query();
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
......@@ -1542,6 +1633,10 @@ namespace meta {
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query countQuery = connectionPtr->query();
countQuery << "SELECT size " <<
"FROM TableFiles " <<
......@@ -1585,6 +1680,10 @@ namespace meta {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// }
Query dropTableQuery = connectionPtr->query();
dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";
if (dropTableQuery.exec()) {
......
......@@ -24,6 +24,12 @@ ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias)
ParseCritirias(criterias);
}
void ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT& criterial) {
for(auto& pair : criterial) {
criterias_[pair.first] = pair.second;
}
}
void ArchiveConf::ParseCritirias(const std::string& criterias) {
std::stringstream ss(criterias);
std::vector<std::string> tokens;
......
......@@ -19,14 +19,20 @@ static constexpr uint64_t ONE_KB = 1024;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
static const std::string ARCHIVE_CONF_DISK = "disk";
static const std::string ARCHIVE_CONF_DAYS = "days";
static const std::string ARCHIVE_CONF_DEFAULT = ARCHIVE_CONF_DISK + ":512";
struct ArchiveConf {
using CriteriaT = std::map<std::string, int>;
ArchiveConf(const std::string& type, const std::string& criterias = "disk:512");
ArchiveConf(const std::string& type, const std::string& criterias = ARCHIVE_CONF_DEFAULT);
const std::string& GetType() const { return type_; }
const CriteriaT GetCriterias() const { return criterias_; }
void SetCriterias(const ArchiveConf::CriteriaT& criterial);
private:
void ParseCritirias(const std::string& type);
void ParseType(const std::string& criterias);
......
......@@ -165,6 +165,11 @@ ClientTest::Test(const std::string& address, const std::string& port) {
Status stat = conn->CreateTable(tb_schema);
std::cout << "CreateTable function call status: " << stat.ToString() << std::endl;
PrintTableSchema(tb_schema);
bool has_table = conn->HasTable(tb_schema.table_name);
if(has_table) {
std::cout << "Table is created" << std::endl;
}
}
{//describe table
......
......@@ -156,6 +156,18 @@ public:
virtual Status CreateTable(const TableSchema &param) = 0;
/**
* @brief Test table existence method
*
* This method is used to create table
*
* @param table_name, table name is going to be tested.
*
* @return Indicate if table is cexist
*/
virtual bool HasTable(const std::string &table_name) = 0;
/**
* @brief Delete table method
*
......
......@@ -102,6 +102,15 @@ ClientProxy::CreateTable(const TableSchema &param) {
return Status::OK();
}
bool
ClientProxy::HasTable(const std::string &table_name) {
if(!IsConnected()) {
return false;
}
return ClientPtr()->interface()->HasTable(table_name);
}
Status
ClientProxy::DeleteTable(const std::string &table_name) {
if(!IsConnected()) {
......
......@@ -23,6 +23,8 @@ public:
virtual Status CreateTable(const TableSchema &param) override;
virtual bool HasTable(const std::string &table_name) override;
virtual Status DeleteTable(const std::string &table_name) override;
virtual Status AddVector(const std::string &table_name,
......
......@@ -56,6 +56,11 @@ ConnectionImpl::CreateTable(const TableSchema &param) {
return client_proxy_->CreateTable(param);
}
bool
ConnectionImpl::HasTable(const std::string &table_name) {
return client_proxy_->HasTable(table_name);
}
Status
ConnectionImpl::DeleteTable(const std::string &table_name) {
return client_proxy_->DeleteTable(table_name);
......
......@@ -25,6 +25,8 @@ public:
virtual Status CreateTable(const TableSchema &param) override;
virtual bool HasTable(const std::string &table_name) override;
virtual Status DeleteTable(const std::string &table_name) override;
virtual Status AddVector(const std::string &table_name,
......
......@@ -23,16 +23,30 @@ DBWrapper::DBWrapper() {
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
opt.mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
// std::cout << "mode = " << opt.mode << std::endl;
CommonUtil::CreateDirectory(opt.meta.path);
//set archive config
engine::ArchiveConf::CriteriaT criterial;
int64_t disk = config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0);
int64_t days = config.GetInt64Value(CONFIG_DB_ARCHIVE_DAYS, 0);
if(disk > 0) {
criterial[engine::ARCHIVE_CONF_DISK] = disk;
}
if(days > 0) {
criterial[engine::ARCHIVE_CONF_DAYS] = days;
}
opt.meta.archive_conf.SetCriterias(criterial);
//create db root folder
ServerError err = CommonUtil::CreateDirectory(opt.meta.path);
if(err != SERVER_SUCCESS) {
std::cout << "ERROR! Failed to create database root path: " << opt.meta.path << std::endl;
kill(0, SIGUSR1);
}
zilliz::milvus::engine::DB::Open(opt, &db_);
if(db_ == nullptr) {
SERVER_LOG_ERROR << "Failed to open db. Provided database uri = " << opt.meta.backend_uri;
throw ServerException(SERVER_NULL_POINTER, "Failed to open db");
std::cout << "ERROR! Failed to open database" << std::endl;
kill(0, SIGUSR1);
}
}
......
......@@ -8,7 +8,6 @@
#include "ServerConfig.h"
#include "ThreadPoolServer.h"
#include "DBWrapper.h"
#include "utils/Log.h"
#include "milvus_types.h"
#include "milvus_constants.h"
......@@ -24,6 +23,7 @@
#include <thrift/concurrency/PosixThreadFactory.h>
#include <thread>
#include <iostream>
namespace zilliz {
namespace milvus {
......@@ -68,7 +68,7 @@ MilvusServer::StartService() {
} else if (protocol == "compact") {
protocol_factory.reset(new TCompactProtocolFactory());
} else {
SERVER_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently";
//SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently";
return;
}
......@@ -89,11 +89,12 @@ MilvusServer::StartService() {
threadManager));
s_server->serve();
} else {
SERVER_LOG_ERROR << "Service mode: " << mode << " is not supported currently";
//SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently";
return;
}
} catch (apache::thrift::TException& ex) {
SERVER_LOG_ERROR << "Server encounter exception: " << ex.what();
std::cout << "ERROR! " << ex.what() << std::endl;
kill(0, SIGUSR1);
}
}
......
......@@ -24,6 +24,15 @@ RequestHandler::CreateTable(const thrift::TableSchema &param) {
RequestScheduler::ExecTask(task_ptr);
}
bool
RequestHandler::HasTable(const std::string &table_name) {
bool has_table = false;
BaseTaskPtr task_ptr = HasTableTask::Create(table_name, has_table);
RequestScheduler::ExecTask(task_ptr);
return has_table;
}
void
RequestHandler::DeleteTable(const std::string &table_name) {
BaseTaskPtr task_ptr = DeleteTableTask::Create(table_name);
......
......@@ -19,16 +19,28 @@ public:
RequestHandler();
/**
* @brief Create table method
*
* This method is used to create table
*
* @param param, use to provide table information to be created.
*
*
* @param param
*/
void CreateTable(const ::milvus::thrift::TableSchema& param);
* @brief Create table method
*
* This method is used to create table
*
* @param param, use to provide table information to be created.
*
*
* @param param
*/
void CreateTable(const ::milvus::thrift::TableSchema &param);
/**
* @brief Test table existence method
*
* This method is used to test table existence.
*
* @param table_name, table name is going to be tested.
*
*
* @param table_name
*/
bool HasTable(const std::string &table_name);
/**
* @brief Delete table method
......
......@@ -18,34 +18,34 @@ using namespace ::milvus;
namespace {
const std::map<ServerError, thrift::ErrorCode::type> &ErrorMap() {
static const std::map<ServerError, thrift::ErrorCode::type> code_map = {
{SERVER_UNEXPECTED_ERROR, thrift::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_NULL_POINTER, thrift::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_UNEXPECTED_ERROR, thrift::ErrorCode::UNEXPECTED_ERROR},
{SERVER_UNSUPPORTED_ERROR, thrift::ErrorCode::UNEXPECTED_ERROR},
{SERVER_NULL_POINTER, thrift::ErrorCode::UNEXPECTED_ERROR},
{SERVER_INVALID_ARGUMENT, thrift::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_FILE_NOT_FOUND, thrift::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_NOT_IMPLEMENT, thrift::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_BLOCKING_QUEUE_EMPTY, thrift::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_FILE_NOT_FOUND, thrift::ErrorCode::FILE_NOT_FOUND},
{SERVER_NOT_IMPLEMENT, thrift::ErrorCode::UNEXPECTED_ERROR},
{SERVER_BLOCKING_QUEUE_EMPTY, thrift::ErrorCode::UNEXPECTED_ERROR},
{SERVER_CANNOT_CREATE_FOLDER, thrift::ErrorCode::CANNOT_CREATE_FOLDER},
{SERVER_CANNOT_CREATE_FILE, thrift::ErrorCode::CANNOT_CREATE_FILE},
{SERVER_CANNOT_DELETE_FOLDER, thrift::ErrorCode::CANNOT_DELETE_FOLDER},
{SERVER_CANNOT_DELETE_FILE, thrift::ErrorCode::CANNOT_DELETE_FILE},
{SERVER_TABLE_NOT_EXIST, thrift::ErrorCode::TABLE_NOT_EXISTS},
{SERVER_INVALID_TABLE_NAME, thrift::ErrorCode::ILLEGAL_TABLE_NAME},
{SERVER_INVALID_TABLE_DIMENSION, thrift::ErrorCode::ILLEGAL_DIMENSION},
{SERVER_INVALID_TIME_RANGE, thrift::ErrorCode::ILLEGAL_RANGE},
{SERVER_INVALID_VECTOR_DIMENSION, thrift::ErrorCode::ILLEGAL_DIMENSION},
};
return code_map;
}
const std::map<ServerError, std::string> &ErrorMessage() {
static const std::map<ServerError, std::string> msg_map = {
{SERVER_UNEXPECTED_ERROR, "unexpected error occurs"},
{SERVER_NULL_POINTER, "null pointer error"},
{SERVER_INVALID_ARGUMENT, "invalid argument"},
{SERVER_FILE_NOT_FOUND, "file not found"},
{SERVER_NOT_IMPLEMENT, "not implemented"},
{SERVER_BLOCKING_QUEUE_EMPTY, "queue empty"},
{SERVER_TABLE_NOT_EXIST, "table not exist"},
{SERVER_INVALID_TIME_RANGE, "invalid time range"},
{SERVER_INVALID_VECTOR_DIMENSION, "invalid vector dimension"},
{SERVER_INVALID_INDEX_TYPE, thrift::ErrorCode::ILLEGAL_INDEX_TYPE},
{SERVER_INVALID_ROWRECORD, thrift::ErrorCode::ILLEGAL_ROWRECORD},
{SERVER_INVALID_ROWRECORD_ARRAY, thrift::ErrorCode::ILLEGAL_ROWRECORD},
{SERVER_INVALID_TOPK, thrift::ErrorCode::ILLEGAL_TOPK},
{SERVER_ILLEGAL_VECTOR_ID, thrift::ErrorCode::ILLEGAL_VECTOR_ID},
{SERVER_ILLEGAL_SEARCH_RESULT, thrift::ErrorCode::ILLEGAL_SEARCH_RESULT},
{SERVER_CACHE_ERROR, thrift::ErrorCode::CACHE_FAILED},
{DB_META_TRANSACTION_FAILED, thrift::ErrorCode::META_FAILED},
};
return msg_map;
return code_map;
}
}
......@@ -69,6 +69,14 @@ ServerError BaseTask::Execute() {
return error_code_;
}
ServerError BaseTask::SetError(ServerError error_code, const std::string& error_msg) {
error_code_ = error_code;
error_msg_ = error_msg;
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
ServerError BaseTask::WaitToFinish() {
std::unique_lock <std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
......@@ -102,7 +110,7 @@ void RequestScheduler::ExecTask(BaseTaskPtr& task_ptr) {
ex.__set_code(ErrorMap().at(err));
std::string msg = task_ptr->ErrorMsg();
if(msg.empty()){
msg = ErrorMessage().at(err);
msg = "Error message not set";
}
ex.__set_reason(msg);
throw ex;
......
......@@ -34,6 +34,8 @@ public:
protected:
virtual ServerError OnExecute() = 0;
ServerError SetError(ServerError error_code, const std::string& msg);
protected:
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;
......
此差异已折叠。
......@@ -33,6 +33,22 @@ private:
const ::milvus::thrift::TableSchema& schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class HasTableTask : public BaseTask {
public:
static BaseTaskPtr Create(const std::string& table_name, bool& has_table);
protected:
HasTableTask(const std::string& table_name, bool& has_table);
ServerError OnExecute() override;
private:
std::string table_name_;
bool& has_table_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeTableTask : public BaseTask {
public:
......
......@@ -25,6 +25,8 @@ static const std::string CONFIG_DB = "db_config";
static const std::string CONFIG_DB_URL = "db_backend_url";
static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
static const std::string CONFIG_LOG = "log_config";
......
......@@ -196,6 +196,213 @@ uint32_t MilvusService_CreateTable_presult::read(::apache::thrift::protocol::TPr
}
MilvusService_HasTable_args::~MilvusService_HasTable_args() throw() {
}
uint32_t MilvusService_HasTable_args::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 2:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->table_name);
this->__isset.table_name = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t MilvusService_HasTable_args::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("MilvusService_HasTable_args");
xfer += oprot->writeFieldBegin("table_name", ::apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeString(this->table_name);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
MilvusService_HasTable_pargs::~MilvusService_HasTable_pargs() throw() {
}
uint32_t MilvusService_HasTable_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("MilvusService_HasTable_pargs");
xfer += oprot->writeFieldBegin("table_name", ::apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeString((*(this->table_name)));
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
MilvusService_HasTable_result::~MilvusService_HasTable_result() throw() {
}
uint32_t MilvusService_HasTable_result::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 0:
if (ftype == ::apache::thrift::protocol::T_BOOL) {
xfer += iprot->readBool(this->success);
this->__isset.success = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += this->e.read(iprot);
this->__isset.e = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t MilvusService_HasTable_result::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("MilvusService_HasTable_result");
if (this->__isset.success) {
xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_BOOL, 0);
xfer += oprot->writeBool(this->success);
xfer += oprot->writeFieldEnd();
} else if (this->__isset.e) {
xfer += oprot->writeFieldBegin("e", ::apache::thrift::protocol::T_STRUCT, 1);
xfer += this->e.write(oprot);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
MilvusService_HasTable_presult::~MilvusService_HasTable_presult() throw() {
}
uint32_t MilvusService_HasTable_presult::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 0:
if (ftype == ::apache::thrift::protocol::T_BOOL) {
xfer += iprot->readBool((*(this->success)));
this->__isset.success = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += this->e.read(iprot);
this->__isset.e = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
MilvusService_DeleteTable_args::~MilvusService_DeleteTable_args() throw() {
}
......@@ -2290,6 +2497,67 @@ void MilvusServiceClient::recv_CreateTable()
return;
}
bool MilvusServiceClient::HasTable(const std::string& table_name)
{
send_HasTable(table_name);
return recv_HasTable();
}
void MilvusServiceClient::send_HasTable(const std::string& table_name)
{
int32_t cseqid = 0;
oprot_->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_CALL, cseqid);
MilvusService_HasTable_pargs args;
args.table_name = &table_name;
args.write(oprot_);
oprot_->writeMessageEnd();
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
}
bool MilvusServiceClient::recv_HasTable()
{
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
iprot_->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
::apache::thrift::TApplicationException x;
x.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
throw x;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
}
if (fname.compare("HasTable") != 0) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
}
bool _return;
MilvusService_HasTable_presult result;
result.success = &_return;
result.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
if (result.__isset.success) {
return _return;
}
if (result.__isset.e) {
throw result.e;
}
throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "HasTable failed: unknown result");
}
void MilvusServiceClient::DeleteTable(const std::string& table_name)
{
send_DeleteTable(table_name);
......@@ -2855,6 +3123,63 @@ void MilvusServiceProcessor::process_CreateTable(int32_t seqid, ::apache::thrift
}
}
void MilvusServiceProcessor::process_HasTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
{
void* ctx = NULL;
if (this->eventHandler_.get() != NULL) {
ctx = this->eventHandler_->getContext("MilvusService.HasTable", callContext);
}
::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "MilvusService.HasTable");
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preRead(ctx, "MilvusService.HasTable");
}
MilvusService_HasTable_args args;
args.read(iprot);
iprot->readMessageEnd();
uint32_t bytes = iprot->getTransport()->readEnd();
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postRead(ctx, "MilvusService.HasTable", bytes);
}
MilvusService_HasTable_result result;
try {
result.success = iface_->HasTable(args.table_name);
result.__isset.success = true;
} catch (Exception &e) {
result.e = e;
result.__isset.e = true;
} catch (const std::exception& e) {
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->handlerError(ctx, "MilvusService.HasTable");
}
::apache::thrift::TApplicationException x(e.what());
oprot->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return;
}
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preWrite(ctx, "MilvusService.HasTable");
}
oprot->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_REPLY, seqid);
result.write(oprot);
oprot->writeMessageEnd();
bytes = oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postWrite(ctx, "MilvusService.HasTable", bytes);
}
}
void MilvusServiceProcessor::process_DeleteTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
{
void* ctx = NULL;
......@@ -3399,6 +3724,94 @@ void MilvusServiceConcurrentClient::recv_CreateTable(const int32_t seqid)
} // end while(true)
}
bool MilvusServiceConcurrentClient::HasTable(const std::string& table_name)
{
int32_t seqid = send_HasTable(table_name);
return recv_HasTable(seqid);
}
int32_t MilvusServiceConcurrentClient::send_HasTable(const std::string& table_name)
{
int32_t cseqid = this->sync_.generateSeqId();
::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);
oprot_->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_CALL, cseqid);
MilvusService_HasTable_pargs args;
args.table_name = &table_name;
args.write(oprot_);
oprot_->writeMessageEnd();
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
sentry.commit();
return cseqid;
}
bool MilvusServiceConcurrentClient::recv_HasTable(const int32_t seqid)
{
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
// the read mutex gets dropped and reacquired as part of waitForWork()
// The destructor of this sentry wakes up other clients
::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);
while(true) {
if(!this->sync_.getPending(fname, mtype, rseqid)) {
iprot_->readMessageBegin(fname, mtype, rseqid);
}
if(seqid == rseqid) {
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
::apache::thrift::TApplicationException x;
x.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
sentry.commit();
throw x;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
}
if (fname.compare("HasTable") != 0) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
// in a bad state, don't commit
using ::apache::thrift::protocol::TProtocolException;
throw TProtocolException(TProtocolException::INVALID_DATA);
}
bool _return;
MilvusService_HasTable_presult result;
result.success = &_return;
result.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
if (result.__isset.success) {
sentry.commit();
return _return;
}
if (result.__isset.e) {
sentry.commit();
throw result.e;
}
// in a bad state, don't commit
throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "HasTable failed: unknown result");
}
// seqid != rseqid
this->sync_.updatePending(fname, mtype, rseqid);
// this will temporarily unlock the readMutex, and let other clients get work done
this->sync_.waitForWork(seqid);
} // end while(true)
}
void MilvusServiceConcurrentClient::DeleteTable(const std::string& table_name)
{
int32_t seqid = send_DeleteTable(table_name);
......
......@@ -34,6 +34,18 @@ class MilvusServiceIf {
*/
virtual void CreateTable(const TableSchema& param) = 0;
/**
* @brief Test table existence method
*
* This method is used to test table existence.
*
* @param table_name, table name is going to be tested.
*
*
* @param table_name
*/
virtual bool HasTable(const std::string& table_name) = 0;
/**
* @brief Delete table method
*
......@@ -178,6 +190,10 @@ class MilvusServiceNull : virtual public MilvusServiceIf {
void CreateTable(const TableSchema& /* param */) {
return;
}
bool HasTable(const std::string& /* table_name */) {
bool _return = false;
return _return;
}
void DeleteTable(const std::string& /* table_name */) {
return;
}
......@@ -309,6 +325,118 @@ class MilvusService_CreateTable_presult {
};
typedef struct _MilvusService_HasTable_args__isset {
_MilvusService_HasTable_args__isset() : table_name(false) {}
bool table_name :1;
} _MilvusService_HasTable_args__isset;
class MilvusService_HasTable_args {
public:
MilvusService_HasTable_args(const MilvusService_HasTable_args&);
MilvusService_HasTable_args& operator=(const MilvusService_HasTable_args&);
MilvusService_HasTable_args() : table_name() {
}
virtual ~MilvusService_HasTable_args() throw();
std::string table_name;
_MilvusService_HasTable_args__isset __isset;
void __set_table_name(const std::string& val);
bool operator == (const MilvusService_HasTable_args & rhs) const
{
if (!(table_name == rhs.table_name))
return false;
return true;
}
bool operator != (const MilvusService_HasTable_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const MilvusService_HasTable_args & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
class MilvusService_HasTable_pargs {
public:
virtual ~MilvusService_HasTable_pargs() throw();
const std::string* table_name;
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
typedef struct _MilvusService_HasTable_result__isset {
_MilvusService_HasTable_result__isset() : success(false), e(false) {}
bool success :1;
bool e :1;
} _MilvusService_HasTable_result__isset;
class MilvusService_HasTable_result {
public:
MilvusService_HasTable_result(const MilvusService_HasTable_result&);
MilvusService_HasTable_result& operator=(const MilvusService_HasTable_result&);
MilvusService_HasTable_result() : success(0) {
}
virtual ~MilvusService_HasTable_result() throw();
bool success;
Exception e;
_MilvusService_HasTable_result__isset __isset;
void __set_success(const bool val);
void __set_e(const Exception& val);
bool operator == (const MilvusService_HasTable_result & rhs) const
{
if (!(success == rhs.success))
return false;
if (!(e == rhs.e))
return false;
return true;
}
bool operator != (const MilvusService_HasTable_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const MilvusService_HasTable_result & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
typedef struct _MilvusService_HasTable_presult__isset {
_MilvusService_HasTable_presult__isset() : success(false), e(false) {}
bool success :1;
bool e :1;
} _MilvusService_HasTable_presult__isset;
class MilvusService_HasTable_presult {
public:
virtual ~MilvusService_HasTable_presult() throw();
bool* success;
Exception e;
_MilvusService_HasTable_presult__isset __isset;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
};
typedef struct _MilvusService_DeleteTable_args__isset {
_MilvusService_DeleteTable_args__isset() : table_name(false) {}
bool table_name :1;
......@@ -1269,6 +1397,9 @@ class MilvusServiceClient : virtual public MilvusServiceIf {
void CreateTable(const TableSchema& param);
void send_CreateTable(const TableSchema& param);
void recv_CreateTable();
bool HasTable(const std::string& table_name);
void send_HasTable(const std::string& table_name);
bool recv_HasTable();
void DeleteTable(const std::string& table_name);
void send_DeleteTable(const std::string& table_name);
void recv_DeleteTable();
......@@ -1309,6 +1440,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor {
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_;
void process_CreateTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_HasTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_DeleteTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_AddVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_SearchVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
......@@ -1321,6 +1453,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor {
MilvusServiceProcessor(::apache::thrift::stdcxx::shared_ptr<MilvusServiceIf> iface) :
iface_(iface) {
processMap_["CreateTable"] = &MilvusServiceProcessor::process_CreateTable;
processMap_["HasTable"] = &MilvusServiceProcessor::process_HasTable;
processMap_["DeleteTable"] = &MilvusServiceProcessor::process_DeleteTable;
processMap_["AddVector"] = &MilvusServiceProcessor::process_AddVector;
processMap_["SearchVector"] = &MilvusServiceProcessor::process_SearchVector;
......@@ -1366,6 +1499,15 @@ class MilvusServiceMultiface : virtual public MilvusServiceIf {
ifaces_[i]->CreateTable(param);
}
bool HasTable(const std::string& table_name) {
size_t sz = ifaces_.size();
size_t i = 0;
for (; i < (sz - 1); ++i) {
ifaces_[i]->HasTable(table_name);
}
return ifaces_[i]->HasTable(table_name);
}
void DeleteTable(const std::string& table_name) {
size_t sz = ifaces_.size();
size_t i = 0;
......@@ -1477,6 +1619,9 @@ class MilvusServiceConcurrentClient : virtual public MilvusServiceIf {
void CreateTable(const TableSchema& param);
int32_t send_CreateTable(const TableSchema& param);
void recv_CreateTable(const int32_t seqid);
bool HasTable(const std::string& table_name);
int32_t send_HasTable(const std::string& table_name);
bool recv_HasTable(const int32_t seqid);
void DeleteTable(const std::string& table_name);
int32_t send_DeleteTable(const std::string& table_name);
void recv_DeleteTable(const int32_t seqid);
......
......@@ -35,6 +35,21 @@ class MilvusServiceHandler : virtual public MilvusServiceIf {
printf("CreateTable\n");
}
/**
* @brief Test table existence method
*
* This method is used to test table existence.
*
* @param table_name, table name is going to be tested.
*
*
* @param table_name
*/
bool HasTable(const std::string& table_name) {
// Your implementation goes here
printf("HasTable\n");
}
/**
* @brief Delete table method
*
......
......@@ -15,23 +15,51 @@ namespace milvus { namespace thrift {
int _kErrorCodeValues[] = {
ErrorCode::SUCCESS,
ErrorCode::UNEXPECTED_ERROR,
ErrorCode::CONNECT_FAILED,
ErrorCode::PERMISSION_DENIED,
ErrorCode::TABLE_NOT_EXISTS,
ErrorCode::ILLEGAL_ARGUMENT,
ErrorCode::ILLEGAL_RANGE,
ErrorCode::ILLEGAL_DIMENSION
ErrorCode::ILLEGAL_DIMENSION,
ErrorCode::ILLEGAL_INDEX_TYPE,
ErrorCode::ILLEGAL_TABLE_NAME,
ErrorCode::ILLEGAL_TOPK,
ErrorCode::ILLEGAL_ROWRECORD,
ErrorCode::ILLEGAL_VECTOR_ID,
ErrorCode::ILLEGAL_SEARCH_RESULT,
ErrorCode::FILE_NOT_FOUND,
ErrorCode::META_FAILED,
ErrorCode::CACHE_FAILED,
ErrorCode::CANNOT_CREATE_FOLDER,
ErrorCode::CANNOT_CREATE_FILE,
ErrorCode::CANNOT_DELETE_FOLDER,
ErrorCode::CANNOT_DELETE_FILE
};
const char* _kErrorCodeNames[] = {
"SUCCESS",
"UNEXPECTED_ERROR",
"CONNECT_FAILED",
"PERMISSION_DENIED",
"TABLE_NOT_EXISTS",
"ILLEGAL_ARGUMENT",
"ILLEGAL_RANGE",
"ILLEGAL_DIMENSION"
"ILLEGAL_DIMENSION",
"ILLEGAL_INDEX_TYPE",
"ILLEGAL_TABLE_NAME",
"ILLEGAL_TOPK",
"ILLEGAL_ROWRECORD",
"ILLEGAL_VECTOR_ID",
"ILLEGAL_SEARCH_RESULT",
"FILE_NOT_FOUND",
"META_FAILED",
"CACHE_FAILED",
"CANNOT_CREATE_FOLDER",
"CANNOT_CREATE_FILE",
"CANNOT_DELETE_FOLDER",
"CANNOT_DELETE_FILE"
};
const std::map<int, const char*> _ErrorCode_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kErrorCodeValues, _kErrorCodeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
const std::map<int, const char*> _ErrorCode_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kErrorCodeValues, _kErrorCodeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
std::ostream& operator<<(std::ostream& out, const ErrorCode::type& val) {
std::map<int, const char*>::const_iterator it = _ErrorCode_VALUES_TO_NAMES.find(val);
......
......@@ -23,12 +23,26 @@ namespace milvus { namespace thrift {
struct ErrorCode {
enum type {
SUCCESS = 0,
CONNECT_FAILED = 1,
PERMISSION_DENIED = 2,
TABLE_NOT_EXISTS = 3,
ILLEGAL_ARGUMENT = 4,
ILLEGAL_RANGE = 5,
ILLEGAL_DIMENSION = 6
UNEXPECTED_ERROR = 1,
CONNECT_FAILED = 2,
PERMISSION_DENIED = 3,
TABLE_NOT_EXISTS = 4,
ILLEGAL_ARGUMENT = 5,
ILLEGAL_RANGE = 6,
ILLEGAL_DIMENSION = 7,
ILLEGAL_INDEX_TYPE = 8,
ILLEGAL_TABLE_NAME = 9,
ILLEGAL_TOPK = 10,
ILLEGAL_ROWRECORD = 11,
ILLEGAL_VECTOR_ID = 12,
ILLEGAL_SEARCH_RESULT = 13,
FILE_NOT_FOUND = 14,
META_FAILED = 15,
CACHE_FAILED = 16,
CANNOT_CREATE_FOLDER = 17,
CANNOT_CREATE_FILE = 18,
CANNOT_DELETE_FOLDER = 19,
CANNOT_DELETE_FILE = 20
};
};
......
......@@ -15,12 +15,26 @@ namespace netcore milvus.thrift
enum ErrorCode {
SUCCESS = 0,
UNEXPECTED_ERROR,
CONNECT_FAILED,
PERMISSION_DENIED,
TABLE_NOT_EXISTS,
ILLEGAL_ARGUMENT,
ILLEGAL_RANGE,
ILLEGAL_DIMENSION,
ILLEGAL_INDEX_TYPE,
ILLEGAL_TABLE_NAME,
ILLEGAL_TOPK,
ILLEGAL_ROWRECORD,
ILLEGAL_VECTOR_ID,
ILLEGAL_SEARCH_RESULT,
FILE_NOT_FOUND,
META_FAILED,
CACHE_FAILED,
CANNOT_CREATE_FOLDER,
CANNOT_CREATE_FILE,
CANNOT_DELETE_FOLDER,
CANNOT_DELETE_FILE,
}
exception Exception {
......@@ -80,6 +94,16 @@ service MilvusService {
*/
void CreateTable(2: TableSchema param) throws(1: Exception e);
/**
* @brief Test table existence method
*
* This method is used to test table existence.
*
* @param table_name, table name is going to be tested.
*
*/
bool HasTable(2: string table_name) throws(1: Exception e);
/**
* @brief Delete table method
......
......@@ -24,18 +24,35 @@ ToGlobalServerErrorCode(const ServerError error_code) {
return SERVER_ERROR_CODE_BASE + error_code;
}
constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(0x001);
constexpr ServerError SERVER_UNSUPPORTED_ERROR = ToGlobalServerErrorCode(0x002);
constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(0x003);
constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004);
constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005);
constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006);
constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007);
constexpr ServerError SERVER_TABLE_NOT_EXIST = ToGlobalServerErrorCode(0x008);
constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(0x009);
constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(0x00a);
constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(0x00b);
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(0x00c);
constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(1);
constexpr ServerError SERVER_UNSUPPORTED_ERROR = ToGlobalServerErrorCode(2);
constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(3);
constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(4);
constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(5);
constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(6);
constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(7);
constexpr ServerError SERVER_CANNOT_CREATE_FOLDER = ToGlobalServerErrorCode(8);
constexpr ServerError SERVER_CANNOT_CREATE_FILE = ToGlobalServerErrorCode(9);
constexpr ServerError SERVER_CANNOT_DELETE_FOLDER = ToGlobalServerErrorCode(10);
constexpr ServerError SERVER_CANNOT_DELETE_FILE = ToGlobalServerErrorCode(11);
constexpr ServerError SERVER_TABLE_NOT_EXIST = ToGlobalServerErrorCode(100);
constexpr ServerError SERVER_INVALID_TABLE_NAME = ToGlobalServerErrorCode(101);
constexpr ServerError SERVER_INVALID_TABLE_DIMENSION = ToGlobalServerErrorCode(102);
constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(103);
constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(104);
constexpr ServerError SERVER_INVALID_INDEX_TYPE = ToGlobalServerErrorCode(105);
constexpr ServerError SERVER_INVALID_ROWRECORD = ToGlobalServerErrorCode(106);
constexpr ServerError SERVER_INVALID_ROWRECORD_ARRAY = ToGlobalServerErrorCode(107);
constexpr ServerError SERVER_INVALID_TOPK = ToGlobalServerErrorCode(108);
constexpr ServerError SERVER_ILLEGAL_VECTOR_ID = ToGlobalServerErrorCode(109);
constexpr ServerError SERVER_ILLEGAL_SEARCH_RESULT = ToGlobalServerErrorCode(110);
constexpr ServerError SERVER_CACHE_ERROR = ToGlobalServerErrorCode(111);
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(500);
constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(501);
constexpr ServerError DB_META_TRANSACTION_FAILED = ToGlobalServerErrorCode(1000);
class ServerException : public std::exception {
public:
......
......@@ -17,39 +17,39 @@
using namespace zilliz::milvus::engine;
TEST_F(MetaTest, GROUP_TEST) {
auto table_id = "meta_test_group";
TEST_F(MetaTest, TABLE_TEST) {
auto table_id = "meta_test_table";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl_->CreateTable(group);
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
ASSERT_TRUE(status.ok());
auto gid = group.id_;
group.id_ = -1;
status = impl_->DescribeTable(group);
auto gid = table.id_;
table.id_ = -1;
status = impl_->DescribeTable(table);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group.id_, gid);
ASSERT_EQ(group.table_id_, table_id);
ASSERT_EQ(table.id_, gid);
ASSERT_EQ(table.table_id_, table_id);
group.table_id_ = "not_found";
status = impl_->DescribeTable(group);
table.table_id_ = "not_found";
status = impl_->DescribeTable(table);
ASSERT_TRUE(!status.ok());
group.table_id_ = table_id;
status = impl_->CreateTable(group);
ASSERT_TRUE(!status.ok());
table.table_id_ = table_id;
status = impl_->CreateTable(table);
ASSERT_TRUE(status.ok());
}
TEST_F(MetaTest, table_file_TEST) {
auto table_id = "meta_test_group";
TEST_F(MetaTest, TABLE_FILE_TEST) {
auto table_id = "meta_test_table";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl_->CreateTable(group);
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
table_file.table_id_ = table.table_id_;
status = impl_->CreateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW);
......@@ -104,15 +104,15 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options);
auto table_id = "meta_test_group";
auto table_id = "meta_test_table";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl.CreateTable(group);
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl.CreateTable(table);
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
table_file.table_id_ = table.table_id_;
auto cnt = 100;
long ts = utils::GetMicroSecTimeStamp();
......@@ -156,13 +156,13 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
auto impl = meta::DBMetaImpl(options);
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl.CreateTable(group);
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl.CreateTable(table);
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
table_file.table_id_ = table.table_id_;
auto cnt = 10;
auto each_size = 2UL;
......@@ -198,9 +198,9 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
TEST_F(MetaTest, TABLE_FILES_TEST) {
auto table_id = "meta_test_group";
meta::TableSchema group;
group.table_id_ = table_id;
auto status = impl_->CreateTable(group);
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
int new_files_cnt = 4;
int raw_files_cnt = 5;
......@@ -208,7 +208,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
int index_files_cnt = 7;
meta::TableFileSchema table_file;
table_file.table_id_ = group.table_id_;
table_file.table_id_ = table.table_id_;
for (auto i=0; i<new_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
......@@ -241,7 +241,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatePartionedTableFilesSchema dated_files;
status = impl_->FilesToMerge(group.table_id_, dated_files);
status = impl_->FilesToMerge(table.table_id_, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册