提交 46a184f1 编写于 作者: W wxyu

Merge remote-tracking branch 'main/branch-0.4.0' into branch-0.4.0


Former-commit-id: 567d2b89bf0e202d39bcf81c9b1e8121dea254e4
...@@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-327 - Clean code for milvus - MS-327 - Clean code for milvus
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr
## Task ## Task
- MS-297 - disable mysql unit test - MS-297 - disable mysql unit test
......
...@@ -11,7 +11,7 @@ PROFILING="OFF" ...@@ -11,7 +11,7 @@ PROFILING="OFF"
BUILD_FAISS_WITH_MKL="OFF" BUILD_FAISS_WITH_MKL="OFF"
USE_JFROG_CACHE="OFF" USE_JFROG_CACHE="OFF"
KNOWHERE_OPTS="" KNOWHERE_OPTS=""
MILVUS_WITH_THRIFT="ON" MILVUS_WITH_THRIFT="OFF"
while getopts "p:d:t:uhlrcgmj" arg while getopts "p:d:t:uhlrcgmj" arg
do do
......
...@@ -90,7 +90,7 @@ define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON) ...@@ -90,7 +90,7 @@ define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON)
define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON) define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON)
define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON) define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" OFF)
define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON)
......
...@@ -2679,7 +2679,7 @@ macro(build_grpc) ...@@ -2679,7 +2679,7 @@ macro(build_grpc)
add_dependencies(grpc_protoc grpc_ep) add_dependencies(grpc_protoc grpc_ep)
endmacro() endmacro()
#if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") if(NOT MILVUS_WITH_THRIFT STREQUAL "ON")
resolve_dependency(GRPC) resolve_dependency(GRPC)
get_target_property(GRPC_INCLUDE_DIR grpc INTERFACE_INCLUDE_DIRECTORIES) get_target_property(GRPC_INCLUDE_DIR grpc INTERFACE_INCLUDE_DIRECTORIES)
...@@ -2690,4 +2690,4 @@ endmacro() ...@@ -2690,4 +2690,4 @@ endmacro()
include_directories(SYSTEM ${GRPC_THIRD_PARTY_DIR}/protobuf/src) include_directories(SYSTEM ${GRPC_THIRD_PARTY_DIR}/protobuf/src)
link_directories(SYSTEM ${GRPC_PROTOBUF_LIB_DIR}) link_directories(SYSTEM ${GRPC_PROTOBUF_LIB_DIR})
#endif() endif()
#!/bin/bash #!/bin/bash
../bin/milvus_server -c ../conf/server_config.yaml -l ../conf/log_config.conf ../bin/milvus_grpc_server -c ../conf/server_config.yaml -l ../conf/log_config.conf
...@@ -35,6 +35,7 @@ set(license_generator_files ...@@ -35,6 +35,7 @@ set(license_generator_files
license/LicenseLibrary.cpp license/LicenseLibrary.cpp
) )
if (MILVUS_WITH_THRIFT STREQUAL "ON")
set(thrift_service_files set(thrift_service_files
thrift/gen-cpp/MilvusService.cpp thrift/gen-cpp/MilvusService.cpp
thrift/gen-cpp/milvus_constants.cpp thrift/gen-cpp/milvus_constants.cpp
...@@ -44,7 +45,7 @@ set(thrift_service_files ...@@ -44,7 +45,7 @@ set(thrift_service_files
server/thrift_impl/ThreadPoolServer.cpp server/thrift_impl/ThreadPoolServer.cpp
server/thrift_impl/ThreadPoolServer.h server/thrift_impl/ThreadPoolServer.h
) )
else()
set(grpc_service_files set(grpc_service_files
metrics/SystemInfo.cpp metrics/SystemInfo.cpp
metrics/SystemInfo.h metrics/SystemInfo.h
...@@ -53,6 +54,7 @@ set(grpc_service_files ...@@ -53,6 +54,7 @@ set(grpc_service_files
grpc/gen-status/status.grpc.pb.cc grpc/gen-status/status.grpc.pb.cc
grpc/gen-status/status.pb.cc grpc/gen-status/status.pb.cc
) )
endif()
set(db_files set(db_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
...@@ -77,17 +79,17 @@ include_directories(/usr/include/mysql) ...@@ -77,17 +79,17 @@ include_directories(/usr/include/mysql)
include_directories(grpc/gen-status) include_directories(grpc/gen-status)
include_directories(grpc/gen-milvus) include_directories(grpc/gen-milvus)
#if (MILVUS_WITH_THRIFT STREQUAL "ON") if (MILVUS_WITH_THRIFT STREQUAL "ON")
set(client_thrift_lib set(client_thrift_lib
thrift) thrift)
#else() else()
set(client_grpc_lib set(client_grpc_lib
grpcpp_channelz grpcpp_channelz
grpc++ grpc++
grpc grpc
grpc_protobuf grpc_protobuf
grpc_protoc) grpc_protoc)
#endif() endif()
set(third_party_libs set(third_party_libs
knowhere knowhere
...@@ -197,34 +199,25 @@ set(knowhere_libs ...@@ -197,34 +199,25 @@ set(knowhere_libs
tbb tbb
) )
#if (MILVUS_WITH_THRIFT STREQUAL "ON") if (MILVUS_WITH_THRIFT STREQUAL "ON")
# add_executable(milvus_thrift_server add_executable(milvus_thrift_server
# ${config_files} ${config_files}
# ${server_files} ${server_files}
# ${thriftserver_files} ${thriftserver_files}
# ${utils_files} ${utils_files}
# ${thrift_service_files} ${thrift_service_files}
# ${metrics_files} ${metrics_files}
# ) )
#else() else()
# add_executable(milvus_grpc_server add_executable(milvus_grpc_server
# ${config_files} ${config_files}
# ${server_files} ${server_files}
# ${grpcserver_files} ${grpcserver_files}
# ${utils_files} ${utils_files}
# ${grpc_service_files} ${grpc_service_files}
# ${metrics_files} ${metrics_files}
# ) )
#endif() endif()
add_executable(milvus_server
${config_files}
${server_files}
${thriftserver_files}
${grpcserver_files}
${utils_files}
${thrift_service_files}
${grpc_service_files}
${metrics_files})
if (ENABLE_LICENSE STREQUAL "ON") if (ENABLE_LICENSE STREQUAL "ON")
add_executable(get_sys_info ${get_sys_info_files}) add_executable(get_sys_info ${get_sys_info_files})
...@@ -233,28 +226,25 @@ if (ENABLE_LICENSE STREQUAL "ON") ...@@ -233,28 +226,25 @@ if (ENABLE_LICENSE STREQUAL "ON")
target_link_libraries(get_sys_info ${license_libs} license_check ${third_party_libs}) target_link_libraries(get_sys_info ${license_libs} license_check ${third_party_libs})
target_link_libraries(license_generator ${license_libs} ${third_party_libs}) target_link_libraries(license_generator ${license_libs} ${third_party_libs})
# if(MILVUS_WITH_THRIFT STREQUAL "ON") if(MILVUS_WITH_THRIFT STREQUAL "ON")
# target_link_libraries(milvus_thrift_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) target_link_libraries(milvus_thrift_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs})
# else() else()
# target_link_libraries(milvus_grpc_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) target_link_libraries(milvus_grpc_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs})
# endif() endif()
target_link_libraries(milvus_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs})
else () else ()
# if(MILVUS_WITH_THRIFT STREQUAL "ON") if(MILVUS_WITH_THRIFT STREQUAL "ON")
# target_link_libraries(milvus_thrift_server ${server_libs} ${knowhere_libs} ${third_party_libs}) target_link_libraries(milvus_thrift_server ${server_libs} ${knowhere_libs} ${third_party_libs})
# else() else()
# target_link_libraries(milvus_grpc_server ${server_libs} ${knowhere_libs} ${third_party_libs}) target_link_libraries(milvus_grpc_server ${server_libs} ${knowhere_libs} ${third_party_libs})
# endif() endif()
target_link_libraries(milvus_server ${server_libs} ${knowhere_libs} ${third_party_libs})
endif() endif()
#if (MILVUS_WITH_THRIFT STREQUAL "ON") if (MILVUS_WITH_THRIFT STREQUAL "ON")
# install(TARGETS milvus_thrift_server DESTINATION bin) install(TARGETS milvus_thrift_server DESTINATION bin)
#else() else()
# install(TARGETS milvus_grpc_server DESTINATION bin) install(TARGETS milvus_grpc_server DESTINATION bin)
#endif() endif()
install(TARGETS milvus_server DESTINATION bin)
install(FILES install(FILES
${KNOWHERE_BUILD_DIR}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}tbb${CMAKE_SHARED_LIBRARY_SUFFIX} ${KNOWHERE_BUILD_DIR}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}tbb${CMAKE_SHARED_LIBRARY_SUFFIX}
......
#!/bin/bash #!/bin/bash
protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` status.proto /home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc="/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/grpc_cpp_plugin" status.proto
protoc -I . --cpp_out=./gen-status status.proto /home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --cpp_out=./gen-status status.proto
protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` milvus.proto /home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/grpc_cpp_plugin" milvus.proto
protoc -I . --cpp_out=./gen-milvus milvus.proto /home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --cpp_out=./gen-milvus milvus.proto
\ No newline at end of file \ No newline at end of file
此差异已折叠。
...@@ -38,9 +38,9 @@ message RowRecord { ...@@ -38,9 +38,9 @@ message RowRecord {
} }
/** /**
* @brief Infos to be inserted * @brief params to be inserted
*/ */
message InsertInfos { message InsertParam {
string table_name = 1; string table_name = 1;
repeated RowRecord row_record_array = 2; repeated RowRecord row_record_array = 2;
} }
...@@ -54,9 +54,9 @@ message VectorIds { ...@@ -54,9 +54,9 @@ message VectorIds {
} }
/** /**
* @brief Infos for searching vector * @brief params for searching vector
*/ */
message SearchVectorInfos { message SearchParam {
string table_name = 1; string table_name = 1;
repeated RowRecord query_record_array = 2; repeated RowRecord query_record_array = 2;
repeated Range query_range_array = 3; repeated Range query_range_array = 3;
...@@ -64,15 +64,15 @@ message SearchVectorInfos { ...@@ -64,15 +64,15 @@ message SearchVectorInfos {
} }
/** /**
* @brief Infos for searching vector in files * @brief params for searching vector in files
*/ */
message SearchVectorInFilesInfos { message SearchInFilesParam {
repeated string file_id_array = 1; repeated string file_id_array = 1;
SearchVectorInfos search_vector_infos = 2; SearchParam search_param = 2;
} }
/** /**
* @brief Query result infos * @brief Query result params
*/ */
message QueryResult { message QueryResult {
int64 id = 1; int64 id = 1;
...@@ -119,11 +119,28 @@ message Command { ...@@ -119,11 +119,28 @@ message Command {
} }
/** /**
* @brief Give Server Command * @brief Index
*/ */
message ServerStatus{ message Index {
Status status = 1; int32 index_type = 1;
string info = 2; int64 nlist = 2;
int32 index_file_size = 3;
}
/**
* @brief Index params
*/
message IndexParam {
TableName table_name = 1;
Index index = 2;
}
/**
* @brief table name and range for DeleteByRange
*/
message DeleteByRangeParam {
Range range = 1;
string table_name = 2;
} }
service MilvusService { service MilvusService {
...@@ -165,7 +182,7 @@ service MilvusService { ...@@ -165,7 +182,7 @@ service MilvusService {
* @param table_name, table is going to be built index. * @param table_name, table is going to be built index.
* *
*/ */
rpc BuildIndex(TableName) returns (Status) {} rpc CreateIndex(IndexParam) returns (Status) {}
/** /**
* @brief Add vector array to table * @brief Add vector array to table
...@@ -177,7 +194,7 @@ service MilvusService { ...@@ -177,7 +194,7 @@ service MilvusService {
* *
* @return vector id array * @return vector id array
*/ */
rpc InsertVector(InsertInfos) returns (VectorIds) {} rpc Insert(InsertParam) returns (VectorIds) {}
/** /**
* @brief Query vector * @brief Query vector
...@@ -191,7 +208,7 @@ service MilvusService { ...@@ -191,7 +208,7 @@ service MilvusService {
* *
* @return query result array. * @return query result array.
*/ */
rpc SearchVector(SearchVectorInfos) returns (stream TopKQueryResult) {} rpc Search(SearchParam) returns (stream TopKQueryResult) {}
/** /**
* @brief Internal use query interface * @brief Internal use query interface
...@@ -205,7 +222,7 @@ service MilvusService { ...@@ -205,7 +222,7 @@ service MilvusService {
* *
* @return query result array. * @return query result array.
*/ */
rpc SearchVectorInFiles(SearchVectorInFilesInfos) returns (stream TopKQueryResult) {} rpc SearchInFiles(SearchInFilesParam) returns (stream TopKQueryResult) {}
/** /**
* @brief Get table schema * @brief Get table schema
...@@ -227,7 +244,7 @@ service MilvusService { ...@@ -227,7 +244,7 @@ service MilvusService {
* *
* @return table schema * @return table schema
*/ */
rpc GetTableRowCount(TableName) returns (TableRowCount) {} rpc CountTable(TableName) returns (TableRowCount) {}
/** /**
* @brief List all tables in database * @brief List all tables in database
...@@ -246,5 +263,42 @@ service MilvusService { ...@@ -246,5 +263,42 @@ service MilvusService {
* *
* @return Server status. * @return Server status.
*/ */
rpc Ping(Command) returns (ServerStatus) {} rpc Cmd(Command) returns (StringReply) {}
/**
* @brief delete table by range
*
* This method is used to delete vector by range
*
* @return rpc status.
*/
rpc DeleteByRange(DeleteByRangeParam) returns (Status) {}
/**
* @brief preload table
*
* This method is used to preload table
*
* @return Status.
*/
rpc PreloadTable(TableName) returns (Status) {}
/**
* @brief describe index
*
* This method is used to describe index
*
* @return Status.
*/
rpc DescribeIndex(TableName) returns (IndexParam) {}
/**
* @brief drop index
*
* This method is used to drop index
*
* @return Status.
*/
rpc DropIndex(TableName) returns (Status) {}
} }
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ResourceMgr.h"
#include "db/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
ResourceMgr::ResourceMgr()
: running_(false) {
}
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
std::lock_guard<std::mutex> lck(resources_mutex_);
if(running_) {
ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
return ret;
}
resources_.emplace_back(resource);
size_t index = resources_.size() - 1;
resource->RegisterOnStartUp([&] {
start_up_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnFinishTask([&] {
finish_task_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnCopyCompleted([&] {
copy_completed_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnTaskTableUpdated([&] {
task_table_updated_event_[index] = true;
event_cv_.notify_one();
});
return ret;
}
void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
if (auto observe_a = res1.lock()) {
if (auto observe_b = res2.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
}
}
}
void
ResourceMgr::EventProcess() {
while (running_) {
std::unique_lock <std::mutex> lock(resources_mutex_);
event_cv_.wait(lock, [this] { return !resources_.empty(); });
if(!running_) {
break;
}
for (uint64_t i = 0; i < resources_.size(); ++i) {
ResourceWPtr res(resources_[i]);
if (start_up_event_[i]) {
on_start_up_(res);
start_up_event_[i] = false;
}
if (finish_task_event_[i]) {
on_finish_task_(res);
finish_task_event_[i] = false;
}
if (copy_completed_event_[i]) {
on_copy_completed_(res);
copy_completed_event_[i] = false;
}
if (task_table_updated_event_[i]) {
on_task_table_updated_(res);
task_table_updated_event_[i] = false;
}
}
}
}
void
ResourceMgr::Start() {
std::lock_guard<std::mutex> lck(resources_mutex_);
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
running_ = true;
}
void
ResourceMgr::Stop() {
std::lock_guard<std::mutex> lck(resources_mutex_);
running_ = false;
worker_thread_.join();
for (auto &resource : resources_) {
resource->Stop();
}
}
std::string
ResourceMgr::Dump() {
std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
for (uint64_t i = 0; i < resources_.size(); ++i) {
str += "Resource No." + std::to_string(i) + ":\n";
//str += resources_[i]->Dump();
}
return str;
}
}
}
}
...@@ -9,7 +9,10 @@ ...@@ -9,7 +9,10 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <mutex>
#include <condition_variable>
#include "resource/Resource.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -17,7 +20,7 @@ namespace engine { ...@@ -17,7 +20,7 @@ namespace engine {
class ResourceMgr { class ResourceMgr {
public: public:
ResourceMgr() : running_(false) {} ResourceMgr();
/******** Management Interface ********/ /******** Management Interface ********/
...@@ -27,42 +30,24 @@ public: ...@@ -27,42 +30,24 @@ public:
* Functions only modify bool variable, like event trigger; * Functions only modify bool variable, like event trigger;
*/ */
ResourceWPtr ResourceWPtr
Add(ResourcePtr &&resource) { Add(ResourcePtr &&resource);
ResourceWPtr ret(resource);
resources_.emplace_back(resource);
// resource->RegisterOnStartUp([] {
// start_up_event_[index] = true;
// });
// resource.RegisterOnFinishTask([] {
// finish_task_event_[index] = true;
// });
return ret;
}
/* /*
* Create connection between A and B; * Create connection between A and B;
*/ */
void void
Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) { Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection);
if (auto observe_a = A.lock()) {
if (auto observe_b = B.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
}
}
}
/* /*
* Synchronous start all resource; * Synchronous start all resource;
* Last, start event process thread; * Last, start event process thread;
*/ */
void void
StartAll() { Start();
for (auto &resource : resources_) {
resource->Start(); void
} Stop();
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
}
// TODO: add stats interface(low) // TODO: add stats interface(low)
...@@ -89,13 +74,17 @@ public: ...@@ -89,13 +74,17 @@ public:
* Register on copy task data completed event; * Register on copy task data completed event;
*/ */
void void
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func); RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func) {
on_copy_completed_ = func;
}
/* /*
* Register on task table updated event; * Register on task table updated event;
*/ */
void void
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func); RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func) {
on_task_table_updated_ = func;
}
public: public:
/******** Utlitity Functions ********/ /******** Utlitity Functions ********/
...@@ -105,23 +94,16 @@ public: ...@@ -105,23 +94,16 @@ public:
private: private:
void void
EventProcess() { EventProcess();
while (running_) {
for (uint64_t i = 0; i < resources_.size(); ++i) {
if (start_up_event_[i]) {
on_start_up_(resources_[i]);
}
}
}
}
private: private:
bool running_; bool running_;
std::vector<ResourcePtr> resources_; std::vector<ResourcePtr> resources_;
mutable std::mutex resources_mutex_;
std::thread worker_thread_; std::thread worker_thread_;
std::condition_variable event_cv_;
std::vector<bool> start_up_event_; std::vector<bool> start_up_event_;
std::vector<bool> finish_task_event_; std::vector<bool> finish_task_event_;
std::vector<bool> copy_completed_event_; std::vector<bool> copy_completed_event_;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#!/bin/bash #!/bin/bash
./cmake_build/src/milvus_server -c ./conf/server_config.yaml -l ./conf/log_config.conf & ./cmake_build/src/milvus_grpc_server -c ./conf/server_config.yaml -l ./conf/log_config.conf &
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册