提交 cbc63817 编写于 作者: P peng.xu

Merge branch 'yuncong' into 'branch-0.3.1-yuncong'

MS-197: add build index api

See merge request megasearch/milvus!179

Former-commit-id: 02da32414757e94f4e89e467bff6b1bdf595427d
......@@ -44,6 +44,8 @@ public:
virtual Status Size(uint64_t& result) = 0;
virtual Status BuildIndex(const std::string& table_id) = 0;
virtual Status DropAll() = 0;
DB() = default;
......
......@@ -430,6 +430,11 @@ void DBImpl::StartBuildIndexTask() {
}
}
Status DBImpl::BuildIndex(const std::string& table_id) {
meta_ptr_->UpdateTableFilesToIndex(table_id);
return BuildIndexByTable(table_id);
}
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
if(to_index == nullptr) {
......@@ -491,7 +496,27 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
return Status::OK();
}
Status DBImpl::BuildIndexByTable(const std::string& table_id) {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
return status;
}
ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
}
return status;
}
void DBImpl::BackgroundBuildIndex() {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
......
......@@ -82,6 +82,8 @@ class DBImpl : public DB {
Status Size(uint64_t &result) override;
Status BuildIndex(const std::string& table_id) override;
~DBImpl() override;
private:
......@@ -110,6 +112,8 @@ class DBImpl : public DB {
void StartBuildIndexTask();
void BackgroundBuildIndex();
Status
BuildIndexByTable(const std::string& table_id);
Status
BuildIndex(const meta::TableFileSchema &);
......@@ -132,6 +136,8 @@ class DBImpl : public DB {
server::ThreadPool index_thread_pool_;
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
}; // DBImpl
......
......@@ -791,6 +791,23 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
return Status::OK();
}
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
));
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files to to_index", e);
}
return Status::OK();
}
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
......
......@@ -35,6 +35,8 @@ public:
const std::vector<size_t>& ids,
TableFilesSchema& table_files) override;
virtual Status UpdateTableFilesToIndex(const std::string& table_id) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
......
......@@ -39,6 +39,8 @@ public:
const std::vector<size_t>& ids,
TableFilesSchema& table_files) = 0;
virtual Status UpdateTableFilesToIndex(const std::string& table_id) = 0;
virtual Status UpdateTableFile(TableFileSchema& file_schema) = 0;
virtual Status UpdateTableFiles(TableFilesSchema& files) = 0;
......
......@@ -1479,6 +1479,11 @@ namespace meta {
return Status::OK();
}
Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
// TODO
return Status::OK();
}
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
......
......@@ -42,6 +42,8 @@ namespace meta {
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
virtual Status UpdateTableFilesToIndex(const std::string& table_id) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
virtual Status FilesToSearch(const std::string& table_id,
......
......@@ -39,6 +39,12 @@ RequestHandler::DeleteTable(const std::string &table_name) {
RequestScheduler::ExecTask(task_ptr);
}
void
RequestHandler::BuildIndex(const std::string &table_name) {
BaseTaskPtr task_ptr = BuildIndexTask::Create(table_name);
RequestScheduler::ExecTask(task_ptr);
}
void
RequestHandler::AddVector(std::vector<int64_t> &_return,
const std::string &table_name,
......
......@@ -54,6 +54,18 @@ public:
*/
void DeleteTable(const std::string& table_name);
/**
* @brief build index by table method
*
* This method is used to build index by table in sync.
*
* @param table_name, table name is going to be built index.
*
*
* @param table_name
*/
void BuildIndex(const std::string &table_name);
/**
* @brief Add vector array to table
*
......
......@@ -12,7 +12,7 @@
namespace zilliz {
namespace milvus {
namespace server {
using namespace ::milvus;
namespace {
......@@ -43,6 +43,7 @@ namespace {
{SERVER_ILLEGAL_SEARCH_RESULT, thrift::ErrorCode::ILLEGAL_SEARCH_RESULT},
{SERVER_CACHE_ERROR, thrift::ErrorCode::CACHE_FAILED},
{DB_META_TRANSACTION_FAILED, thrift::ErrorCode::META_FAILED},
{SERVER_BUILD_INDEX_ERROR, thrift::ErrorCode::BUILD_INDEX_ERROR},
};
return code_map;
......
......@@ -133,7 +133,7 @@ BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) {
ServerError CreateTableTask::OnExecute() {
TimeRecorder rc("CreateTableTask");
try {
//step 1: check arguments
if(schema_.table_name.empty()) {
......@@ -213,6 +213,39 @@ ServerError DescribeTableTask::OnExecute() {
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
BuildIndexTask::BuildIndexTask(const std::string& table_name)
: BaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name) {
}
BaseTaskPtr BuildIndexTask::Create(const std::string& table_name) {
return std::shared_ptr<BaseTask>(new BuildIndexTask(table_name));
}
ServerError BuildIndexTask::OnExecute() {
try {
TimeRecorder rc("BuildIndexTask");
//step 1: check arguments
if(table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
}
//step 2: check table existence
engine::Status stat = DBWrapper::DB()->BuildIndex(table_name_);
if(!stat.ok()) {
return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
}
rc.Elapse("totally cost");
} catch (std::exception& ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
: BaseTask(DDL_DML_TASK_GROUP),
......
......@@ -75,6 +75,21 @@ protected:
ServerError OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class BuildIndexTask : public BaseTask {
public:
static BaseTaskPtr Create(const std::string& table_name);
protected:
BuildIndexTask(const std::string& table_name);
ServerError OnExecute() override;
private:
std::string table_name_;
};
......@@ -174,4 +189,4 @@ private:
}
}
}
\ No newline at end of file
}
......@@ -590,6 +590,193 @@ uint32_t MilvusService_DeleteTable_presult::read(::apache::thrift::protocol::TPr
}
MilvusService_BuildIndex_args::~MilvusService_BuildIndex_args() throw() {
}
uint32_t MilvusService_BuildIndex_args::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 2:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->table_name);
this->__isset.table_name = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t MilvusService_BuildIndex_args::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("MilvusService_BuildIndex_args");
xfer += oprot->writeFieldBegin("table_name", ::apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeString(this->table_name);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
MilvusService_BuildIndex_pargs::~MilvusService_BuildIndex_pargs() throw() {
}
uint32_t MilvusService_BuildIndex_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("MilvusService_BuildIndex_pargs");
xfer += oprot->writeFieldBegin("table_name", ::apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeString((*(this->table_name)));
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
MilvusService_BuildIndex_result::~MilvusService_BuildIndex_result() throw() {
}
uint32_t MilvusService_BuildIndex_result::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += this->e.read(iprot);
this->__isset.e = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t MilvusService_BuildIndex_result::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("MilvusService_BuildIndex_result");
if (this->__isset.e) {
xfer += oprot->writeFieldBegin("e", ::apache::thrift::protocol::T_STRUCT, 1);
xfer += this->e.write(oprot);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
MilvusService_BuildIndex_presult::~MilvusService_BuildIndex_presult() throw() {
}
uint32_t MilvusService_BuildIndex_presult::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += this->e.read(iprot);
this->__isset.e = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
MilvusService_AddVector_args::~MilvusService_AddVector_args() throw() {
}
......@@ -2614,6 +2801,62 @@ void MilvusServiceClient::recv_DeleteTable()
return;
}
void MilvusServiceClient::BuildIndex(const std::string& table_name)
{
send_BuildIndex(table_name);
recv_BuildIndex();
}
void MilvusServiceClient::send_BuildIndex(const std::string& table_name)
{
int32_t cseqid = 0;
oprot_->writeMessageBegin("BuildIndex", ::apache::thrift::protocol::T_CALL, cseqid);
MilvusService_BuildIndex_pargs args;
args.table_name = &table_name;
args.write(oprot_);
oprot_->writeMessageEnd();
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
}
void MilvusServiceClient::recv_BuildIndex()
{
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
iprot_->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
::apache::thrift::TApplicationException x;
x.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
throw x;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
}
if (fname.compare("BuildIndex") != 0) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
}
MilvusService_BuildIndex_presult result;
result.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
if (result.__isset.e) {
throw result.e;
}
return;
}
void MilvusServiceClient::AddVector(std::vector<int64_t> & _return, const std::string& table_name, const std::vector<RowRecord> & record_array)
{
send_AddVector(table_name, record_array);
......@@ -3236,6 +3479,62 @@ void MilvusServiceProcessor::process_DeleteTable(int32_t seqid, ::apache::thrift
}
}
void MilvusServiceProcessor::process_BuildIndex(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
{
void* ctx = NULL;
if (this->eventHandler_.get() != NULL) {
ctx = this->eventHandler_->getContext("MilvusService.BuildIndex", callContext);
}
::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "MilvusService.BuildIndex");
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preRead(ctx, "MilvusService.BuildIndex");
}
MilvusService_BuildIndex_args args;
args.read(iprot);
iprot->readMessageEnd();
uint32_t bytes = iprot->getTransport()->readEnd();
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postRead(ctx, "MilvusService.BuildIndex", bytes);
}
MilvusService_BuildIndex_result result;
try {
iface_->BuildIndex(args.table_name);
} catch (Exception &e) {
result.e = e;
result.__isset.e = true;
} catch (const std::exception& e) {
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->handlerError(ctx, "MilvusService.BuildIndex");
}
::apache::thrift::TApplicationException x(e.what());
oprot->writeMessageBegin("BuildIndex", ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return;
}
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preWrite(ctx, "MilvusService.BuildIndex");
}
oprot->writeMessageBegin("BuildIndex", ::apache::thrift::protocol::T_REPLY, seqid);
result.write(oprot);
oprot->writeMessageEnd();
bytes = oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postWrite(ctx, "MilvusService.BuildIndex", bytes);
}
}
void MilvusServiceProcessor::process_AddVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
{
void* ctx = NULL;
......@@ -3894,6 +4193,88 @@ void MilvusServiceConcurrentClient::recv_DeleteTable(const int32_t seqid)
} // end while(true)
}
void MilvusServiceConcurrentClient::BuildIndex(const std::string& table_name)
{
int32_t seqid = send_BuildIndex(table_name);
recv_BuildIndex(seqid);
}
int32_t MilvusServiceConcurrentClient::send_BuildIndex(const std::string& table_name)
{
int32_t cseqid = this->sync_.generateSeqId();
::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);
oprot_->writeMessageBegin("BuildIndex", ::apache::thrift::protocol::T_CALL, cseqid);
MilvusService_BuildIndex_pargs args;
args.table_name = &table_name;
args.write(oprot_);
oprot_->writeMessageEnd();
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
sentry.commit();
return cseqid;
}
void MilvusServiceConcurrentClient::recv_BuildIndex(const int32_t seqid)
{
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
// the read mutex gets dropped and reacquired as part of waitForWork()
// The destructor of this sentry wakes up other clients
::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);
while(true) {
if(!this->sync_.getPending(fname, mtype, rseqid)) {
iprot_->readMessageBegin(fname, mtype, rseqid);
}
if(seqid == rseqid) {
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
::apache::thrift::TApplicationException x;
x.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
sentry.commit();
throw x;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
}
if (fname.compare("BuildIndex") != 0) {
iprot_->skip(::apache::thrift::protocol::T_STRUCT);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
// in a bad state, don't commit
using ::apache::thrift::protocol::TProtocolException;
throw TProtocolException(TProtocolException::INVALID_DATA);
}
MilvusService_BuildIndex_presult result;
result.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
if (result.__isset.e) {
sentry.commit();
throw result.e;
}
sentry.commit();
return;
}
// seqid != rseqid
this->sync_.updatePending(fname, mtype, rseqid);
// this will temporarily unlock the readMutex, and let other clients get work done
this->sync_.waitForWork(seqid);
} // end while(true)
}
void MilvusServiceConcurrentClient::AddVector(std::vector<int64_t> & _return, const std::string& table_name, const std::vector<RowRecord> & record_array)
{
int32_t seqid = send_AddVector(table_name, record_array);
......
......@@ -58,6 +58,18 @@ class MilvusServiceIf {
*/
virtual void DeleteTable(const std::string& table_name) = 0;
/**
* @brief Build index by table method
*
* This method is used to build index by table in sync mode.
*
* @param table_name, table is going to be built index.
*
*
* @param table_name
*/
virtual void BuildIndex(const std::string& table_name) = 0;
/**
* @brief Add vector array to table
*
......@@ -197,6 +209,9 @@ class MilvusServiceNull : virtual public MilvusServiceIf {
void DeleteTable(const std::string& /* table_name */) {
return;
}
void BuildIndex(const std::string& /* table_name */) {
return;
}
void AddVector(std::vector<int64_t> & /* _return */, const std::string& /* table_name */, const std::vector<RowRecord> & /* record_array */) {
return;
}
......@@ -541,6 +556,110 @@ class MilvusService_DeleteTable_presult {
};
typedef struct _MilvusService_BuildIndex_args__isset {
_MilvusService_BuildIndex_args__isset() : table_name(false) {}
bool table_name :1;
} _MilvusService_BuildIndex_args__isset;
class MilvusService_BuildIndex_args {
public:
MilvusService_BuildIndex_args(const MilvusService_BuildIndex_args&);
MilvusService_BuildIndex_args& operator=(const MilvusService_BuildIndex_args&);
MilvusService_BuildIndex_args() : table_name() {
}
virtual ~MilvusService_BuildIndex_args() throw();
std::string table_name;
_MilvusService_BuildIndex_args__isset __isset;
void __set_table_name(const std::string& val);
bool operator == (const MilvusService_BuildIndex_args & rhs) const
{
if (!(table_name == rhs.table_name))
return false;
return true;
}
bool operator != (const MilvusService_BuildIndex_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const MilvusService_BuildIndex_args & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
class MilvusService_BuildIndex_pargs {
public:
virtual ~MilvusService_BuildIndex_pargs() throw();
const std::string* table_name;
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
typedef struct _MilvusService_BuildIndex_result__isset {
_MilvusService_BuildIndex_result__isset() : e(false) {}
bool e :1;
} _MilvusService_BuildIndex_result__isset;
class MilvusService_BuildIndex_result {
public:
MilvusService_BuildIndex_result(const MilvusService_BuildIndex_result&);
MilvusService_BuildIndex_result& operator=(const MilvusService_BuildIndex_result&);
MilvusService_BuildIndex_result() {
}
virtual ~MilvusService_BuildIndex_result() throw();
Exception e;
_MilvusService_BuildIndex_result__isset __isset;
void __set_e(const Exception& val);
bool operator == (const MilvusService_BuildIndex_result & rhs) const
{
if (!(e == rhs.e))
return false;
return true;
}
bool operator != (const MilvusService_BuildIndex_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const MilvusService_BuildIndex_result & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
typedef struct _MilvusService_BuildIndex_presult__isset {
_MilvusService_BuildIndex_presult__isset() : e(false) {}
bool e :1;
} _MilvusService_BuildIndex_presult__isset;
class MilvusService_BuildIndex_presult {
public:
virtual ~MilvusService_BuildIndex_presult() throw();
Exception e;
_MilvusService_BuildIndex_presult__isset __isset;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
};
typedef struct _MilvusService_AddVector_args__isset {
_MilvusService_AddVector_args__isset() : table_name(false), record_array(false) {}
bool table_name :1;
......@@ -1403,6 +1522,9 @@ class MilvusServiceClient : virtual public MilvusServiceIf {
void DeleteTable(const std::string& table_name);
void send_DeleteTable(const std::string& table_name);
void recv_DeleteTable();
void BuildIndex(const std::string& table_name);
void send_BuildIndex(const std::string& table_name);
void recv_BuildIndex();
void AddVector(std::vector<int64_t> & _return, const std::string& table_name, const std::vector<RowRecord> & record_array);
void send_AddVector(const std::string& table_name, const std::vector<RowRecord> & record_array);
void recv_AddVector(std::vector<int64_t> & _return);
......@@ -1442,6 +1564,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor {
void process_CreateTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_HasTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_DeleteTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_BuildIndex(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_AddVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_SearchVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_SearchVectorInFiles(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
......@@ -1455,6 +1578,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor {
processMap_["CreateTable"] = &MilvusServiceProcessor::process_CreateTable;
processMap_["HasTable"] = &MilvusServiceProcessor::process_HasTable;
processMap_["DeleteTable"] = &MilvusServiceProcessor::process_DeleteTable;
processMap_["BuildIndex"] = &MilvusServiceProcessor::process_BuildIndex;
processMap_["AddVector"] = &MilvusServiceProcessor::process_AddVector;
processMap_["SearchVector"] = &MilvusServiceProcessor::process_SearchVector;
processMap_["SearchVectorInFiles"] = &MilvusServiceProcessor::process_SearchVectorInFiles;
......@@ -1517,6 +1641,15 @@ class MilvusServiceMultiface : virtual public MilvusServiceIf {
ifaces_[i]->DeleteTable(table_name);
}
void BuildIndex(const std::string& table_name) {
size_t sz = ifaces_.size();
size_t i = 0;
for (; i < (sz - 1); ++i) {
ifaces_[i]->BuildIndex(table_name);
}
ifaces_[i]->BuildIndex(table_name);
}
void AddVector(std::vector<int64_t> & _return, const std::string& table_name, const std::vector<RowRecord> & record_array) {
size_t sz = ifaces_.size();
size_t i = 0;
......@@ -1625,6 +1758,9 @@ class MilvusServiceConcurrentClient : virtual public MilvusServiceIf {
void DeleteTable(const std::string& table_name);
int32_t send_DeleteTable(const std::string& table_name);
void recv_DeleteTable(const int32_t seqid);
void BuildIndex(const std::string& table_name);
int32_t send_BuildIndex(const std::string& table_name);
void recv_BuildIndex(const int32_t seqid);
void AddVector(std::vector<int64_t> & _return, const std::string& table_name, const std::vector<RowRecord> & record_array);
int32_t send_AddVector(const std::string& table_name, const std::vector<RowRecord> & record_array);
void recv_AddVector(std::vector<int64_t> & _return, const int32_t seqid);
......
......@@ -65,6 +65,21 @@ class MilvusServiceHandler : virtual public MilvusServiceIf {
printf("DeleteTable\n");
}
/**
* @brief Build index by table method
*
* This method is used to build index by table in sync mode.
*
* @param table_name, table is going to be built index.
*
*
* @param table_name
*/
void BuildIndex(const std::string& table_name) {
// Your implementation goes here
printf("BuildIndex\n");
}
/**
* @brief Add vector array to table
*
......
......@@ -34,7 +34,8 @@ int _kErrorCodeValues[] = {
ErrorCode::CANNOT_CREATE_FOLDER,
ErrorCode::CANNOT_CREATE_FILE,
ErrorCode::CANNOT_DELETE_FOLDER,
ErrorCode::CANNOT_DELETE_FILE
ErrorCode::CANNOT_DELETE_FILE,
ErrorCode::BUILD_INDEX_ERROR
};
const char* _kErrorCodeNames[] = {
"SUCCESS",
......@@ -57,9 +58,10 @@ const char* _kErrorCodeNames[] = {
"CANNOT_CREATE_FOLDER",
"CANNOT_CREATE_FILE",
"CANNOT_DELETE_FOLDER",
"CANNOT_DELETE_FILE"
"CANNOT_DELETE_FILE",
"BUILD_INDEX_ERROR"
};
const std::map<int, const char*> _ErrorCode_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kErrorCodeValues, _kErrorCodeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
const std::map<int, const char*> _ErrorCode_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kErrorCodeValues, _kErrorCodeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
std::ostream& operator<<(std::ostream& out, const ErrorCode::type& val) {
std::map<int, const char*>::const_iterator it = _ErrorCode_VALUES_TO_NAMES.find(val);
......
......@@ -42,7 +42,8 @@ struct ErrorCode {
CANNOT_CREATE_FOLDER = 17,
CANNOT_CREATE_FILE = 18,
CANNOT_DELETE_FOLDER = 19,
CANNOT_DELETE_FILE = 20
CANNOT_DELETE_FILE = 20,
BUILD_INDEX_ERROR = 21
};
};
......
......@@ -35,6 +35,7 @@ enum ErrorCode {
CANNOT_CREATE_FILE,
CANNOT_DELETE_FOLDER,
CANNOT_DELETE_FILE,
BUILD_INDEX_ERROR,
}
exception Exception {
......@@ -115,6 +116,16 @@ service MilvusService {
*/
void DeleteTable(2: string table_name) throws(1: Exception e);
/**
* @brief Build index by table method
*
* This method is used to build index by table in sync mode.
*
* @param table_name, table is going to be built index.
*
*/
void BuildIndex(2: string table_name) throws(1: Exception e);
/**
* @brief Add vector array to table
......@@ -207,4 +218,4 @@ service MilvusService {
* @return Server status.
*/
string Ping(2: string cmd) throws(1: Exception e);
}
\ No newline at end of file
}
......@@ -35,6 +35,7 @@ constexpr ServerError SERVER_CANNOT_CREATE_FOLDER = ToGlobalServerErrorCode(8);
constexpr ServerError SERVER_CANNOT_CREATE_FILE = ToGlobalServerErrorCode(9);
constexpr ServerError SERVER_CANNOT_DELETE_FOLDER = ToGlobalServerErrorCode(10);
constexpr ServerError SERVER_CANNOT_DELETE_FILE = ToGlobalServerErrorCode(11);
constexpr ServerError SERVER_BUILD_INDEX_ERROR = ToGlobalServerErrorCode(12);
constexpr ServerError SERVER_TABLE_NOT_EXIST = ToGlobalServerErrorCode(100);
constexpr ServerError SERVER_INVALID_TABLE_NAME = ToGlobalServerErrorCode(101);
......@@ -54,6 +55,7 @@ constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(5
constexpr ServerError DB_META_TRANSACTION_FAILED = ToGlobalServerErrorCode(1000);
class ServerException : public std::exception {
public:
ServerException(ServerError error_code,
......@@ -77,4 +79,3 @@ private:
} // namespace server
} // namespace milvus
} // namespace zilliz
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册