提交 b1692c09 编写于 作者: G groot

Merge remote-tracking branch 'source/branch-0.5.0' into branch-0.5.0


Former-commit-id: 5a32cc531ec92537e2edad344ee66680d580870d
...@@ -5,6 +5,8 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -5,6 +5,8 @@ Please mark all change in change log and use the ticket from JIRA.
# Milvus 0.5.0 (TODO) # Milvus 0.5.0 (TODO)
## Bug ## Bug
- MS-568 - Fix gpuresource free error
- MS-572 - Milvus crash when get SIGINT
## Improvement ## Improvement
- MS-552 - Add and change the easylogging library - MS-552 - Add and change the easylogging library
......
...@@ -4,9 +4,9 @@ Firstly, welcome, and thanks for your interest in [Milvus](https://milvus.io)! N ...@@ -4,9 +4,9 @@ Firstly, welcome, and thanks for your interest in [Milvus](https://milvus.io)! N
## What is Milvus ## What is Milvus
Milvus is an open source vector search engine that makes incredibly fast querying speed enhancement over current solutions of massive vector processing. Built on optimized indexing algorithm, it is compatible with major AI/ML models. Milvus is an open source vector search engine that supports similarity search of large-scale vectors. Built on optimized indexing algorithm, it is compatible with major AI/ML models.
Milvus was developed by researchers and engineers in ZILLIZ, a tech startup that intends to reinvent data science, with the purpose of providing enterprises with efficient and scalable similarity search and analysis of feature vectors and unstructured data. Milvus was developed by ZILLIZ, a tech startup that intends to reinvent data science, with the purpose of providing enterprises with efficient and scalable similarity search and analysis of feature vectors and unstructured data.
Milvus provides stable Python and C++ APIs, as well as RESTful API. Milvus provides stable Python and C++ APIs, as well as RESTful API.
...@@ -182,11 +182,11 @@ $ python3 example.py ...@@ -182,11 +182,11 @@ $ python3 example.py
Contributions are welcomed and greatly appreciated. If you want to contribute to Milvus, please read the [contribution guidelines](CONTRIBUTING.md). This project adheres to the [code of conduct](CODE OF CONDUCT.md) of Milvus. By participating, you are expected to uphold this code. Contributions are welcomed and greatly appreciated. If you want to contribute to Milvus, please read the [contribution guidelines](CONTRIBUTING.md). This project adheres to the [code of conduct](CODE OF CONDUCT.md) of Milvus. By participating, you are expected to uphold this code.
We use [GitHub issues] to track issues and bugs. For general questions and discussions, please go to [Milvus Forum]. We use [GitHub issues](https://github.com/milvus-io/milvus/issues) to track issues and bugs. For general questions and discussions, please go to [Milvus Forum].
## Join the Milvus community ## Join the Milvus community
For public discussion of Milvus, please join our [discussion group]. For public discussion of Milvus, please join our [discussion group](milvusio.slack.com).
## Milvus Roadmap ## Milvus Roadmap
...@@ -194,7 +194,7 @@ Please read our [roadmap](milvus-io/milvus/docs/master/roadmap.md) to learn abou ...@@ -194,7 +194,7 @@ Please read our [roadmap](milvus-io/milvus/docs/master/roadmap.md) to learn abou
## Resources ## Resources
[Milvus official website](https://milvus.io) [Milvus official website](https://www.milvus.io)
[Milvus docs](https://www.milvus.io/docs/en/QuickStart/) [Milvus docs](https://www.milvus.io/docs/en/QuickStart/)
...@@ -204,7 +204,6 @@ Please read our [roadmap](milvus-io/milvus/docs/master/roadmap.md) to learn abou ...@@ -204,7 +204,6 @@ Please read our [roadmap](milvus-io/milvus/docs/master/roadmap.md) to learn abou
[Milvus roadmap](https://www.milvus-io/docs/master/roadmap.md) [Milvus roadmap](https://www.milvus-io/docs/master/roadmap.md)
[Milvus white paper]
## License ## License
......
...@@ -48,6 +48,7 @@ set(index_srcs ...@@ -48,6 +48,7 @@ set(index_srcs
knowhere/index/vector_index/nsg/nsg_io.cpp knowhere/index/vector_index/nsg/nsg_io.cpp
knowhere/index/vector_index/nsg/utils.cpp knowhere/index/vector_index/nsg/utils.cpp
knowhere/index/vector_index/cloner.cpp knowhere/index/vector_index/cloner.cpp
knowhere/index/vector_index/FaissGpuResourceMgr.cpp
) )
set(depend_libs set(depend_libs
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "FaissGpuResourceMgr.h"
namespace zilliz {
namespace knowhere {
FaissGpuResourceMgr &FaissGpuResourceMgr::GetInstance() {
static FaissGpuResourceMgr instance;
return instance;
}
void FaissGpuResourceMgr::AllocateTempMem(ResPtr &resource,
const int64_t &device_id,
const int64_t &size) {
if (size) {
resource->faiss_res->setTempMemory(size);
}
else {
auto search = devices_params_.find(device_id);
if (search != devices_params_.end()) {
resource->faiss_res->setTempMemory(search->second.temp_mem_size);
}
// else do nothing. allocate when use.
}
}
void FaissGpuResourceMgr::InitDevice(int64_t device_id,
int64_t pin_mem_size,
int64_t temp_mem_size,
int64_t res_num) {
DeviceParams params;
params.pinned_mem_size = pin_mem_size;
params.temp_mem_size = temp_mem_size;
params.resource_num = res_num;
devices_params_.emplace(device_id, params);
}
void FaissGpuResourceMgr::InitResource() {
if(is_init) return ;
is_init = true;
//std::cout << "InitResource" << std::endl;
for(auto& device : devices_params_) {
auto& device_id = device.first;
mutex_cache_.emplace(device_id, std::make_unique<std::mutex>());
//std::cout << "Device Id: " << device_id << std::endl;
auto& device_param = device.second;
auto& bq = idle_map_[device_id];
for (int64_t i = 0; i < device_param.resource_num; ++i) {
//std::cout << "Resource Id: " << i << std::endl;
auto raw_resource = std::make_shared<faiss::gpu::StandardGpuResources>();
// TODO(linxj): enable set pinned memory
auto res_wrapper = std::make_shared<Resource>(raw_resource);
AllocateTempMem(res_wrapper, device_id, 0);
bq.Put(res_wrapper);
}
}
//std::cout << "End initResource" << std::endl;
}
ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
const int64_t &alloc_size) {
InitResource();
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
auto&& resource = bq.Take();
AllocateTempMem(resource, device_id, alloc_size);
return resource;
}
return nullptr;
}
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
bq.Put(res);
}
}
void FaissGpuResourceMgr::Free() {
for (auto &item : idle_map_) {
auto& bq = item.second;
while (!bq.Empty()) {
bq.Take();
}
}
is_init = false;
}
void
FaissGpuResourceMgr::Dump() {
for (auto &item : idle_map_) {
auto& bq = item.second;
std::cout << "device_id: " << item.first
<< ", resource count:" << bq.Size();
}
}
} // knowhere
} // zilliz
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <mutex>
#include <map>
#include <faiss/gpu/StandardGpuResources.h>
#include "src/utils/BlockingQueue.h"
namespace zilliz {
namespace knowhere {
struct Resource {
explicit Resource(std::shared_ptr<faiss::gpu::StandardGpuResources> &r) : faiss_res(r) {
static int64_t global_id = 0;
id = global_id++;
}
std::shared_ptr<faiss::gpu::StandardGpuResources> faiss_res;
int64_t id;
std::mutex mutex;
};
using ResPtr = std::shared_ptr<Resource>;
using ResWPtr = std::weak_ptr<Resource>;
class FaissGpuResourceMgr {
public:
friend class ResScope;
using ResBQ = zilliz::milvus::server::BlockingQueue<ResPtr>;
public:
struct DeviceParams {
int64_t temp_mem_size = 0;
int64_t pinned_mem_size = 0;
int64_t resource_num = 2;
};
public:
static FaissGpuResourceMgr &
GetInstance();
// Free gpu resource, avoid cudaGetDevice error when deallocate.
// this func should be invoke before main return
void
Free();
void
AllocateTempMem(ResPtr &resource, const int64_t& device_id, const int64_t& size);
void
InitDevice(int64_t device_id,
int64_t pin_mem_size = 0,
int64_t temp_mem_size = 0,
int64_t res_num = 2);
void
InitResource();
// allocate gpu memory invoke by build or copy_to_gpu
ResPtr
GetRes(const int64_t &device_id, const int64_t& alloc_size = 0);
void
MoveToIdle(const int64_t &device_id, const ResPtr& res);
void
Dump();
protected:
bool is_init = false;
std::map<int64_t ,std::unique_ptr<std::mutex>> mutex_cache_;
std::map<int64_t, DeviceParams> devices_params_;
std::map<int64_t, ResBQ> idle_map_;
};
class ResScope {
public:
ResScope(ResPtr &res, const int64_t& device_id, const bool& isown)
: resource(res), device_id(device_id), move(true), own(isown) {
Lock();
}
// specif for search
// get the ownership of gpuresource and gpu
ResScope(ResWPtr &res, const int64_t &device_id)
:device_id(device_id),move(false),own(true) {
resource = res.lock();
Lock();
}
void Lock() {
if (own) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
resource->mutex.lock();
}
~ResScope() {
if (own) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->unlock();
if (move) FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
resource->mutex.unlock();
}
private:
ResPtr resource; // hold resource until deconstruct
int64_t device_id;
bool move = true;
bool own = false;
};
} // knowhere
} // zilliz
\ No newline at end of file
...@@ -67,9 +67,9 @@ void GPUIVF::set_index_model(IndexModelPtr model) { ...@@ -67,9 +67,9 @@ void GPUIVF::set_index_model(IndexModelPtr model) {
auto host_index = std::static_pointer_cast<IVFIndexModel>(model); auto host_index = std::static_pointer_cast<IVFIndexModel>(model);
if (auto gpures = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) { if (auto gpures = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(gpures, gpu_id_, false); ResScope rs(gpures, gpu_id_, false);
res_ = gpures; auto device_index = faiss::gpu::index_cpu_to_gpu(gpures->faiss_res.get(), gpu_id_, host_index->index_.get());
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, host_index->index_.get());
index_.reset(device_index); index_.reset(device_index);
res_ = gpures;
} else { } else {
KNOWHERE_THROW_MSG("load index model error, can't get gpu_resource"); KNOWHERE_THROW_MSG("load index model error, can't get gpu_resource");
} }
...@@ -114,9 +114,9 @@ void GPUIVF::LoadImpl(const BinarySet &index_binary) { ...@@ -114,9 +114,9 @@ void GPUIVF::LoadImpl(const BinarySet &index_binary) {
if (auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) { if (auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(temp_res, gpu_id_, false); ResScope rs(temp_res, gpu_id_, false);
res_ = temp_res; auto device_index = faiss::gpu::index_cpu_to_gpu(temp_res->faiss_res.get(), gpu_id_, index);
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, index);
index_.reset(device_index); index_.reset(device_index);
res_ = temp_res;
} else { } else {
KNOWHERE_THROW_MSG("Load error, can't get gpu resource"); KNOWHERE_THROW_MSG("Load error, can't get gpu resource");
} }
...@@ -176,12 +176,13 @@ VectorIndexPtr GPUIVF::CopyGpuToGpu(const int64_t &device_id, const Config &conf ...@@ -176,12 +176,13 @@ VectorIndexPtr GPUIVF::CopyGpuToGpu(const int64_t &device_id, const Config &conf
auto host_index = CopyGpuToCpu(config); auto host_index = CopyGpuToCpu(config);
return std::static_pointer_cast<IVF>(host_index)->CopyCpuToGpu(device_id, config); return std::static_pointer_cast<IVF>(host_index)->CopyCpuToGpu(device_id, config);
} }
void GPUIVF::Add(const DatasetPtr &dataset, const Config &config) { void GPUIVF::Add(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_); if (auto spt = res_.lock()) {
if (temp_resource != nullptr) { ResScope rs(res_, gpu_id_);
ResScope rs(temp_resource, gpu_id_, true);
IVF::Add(dataset, config); IVF::Add(dataset, config);
} else { }
else {
KNOWHERE_THROW_MSG("Add IVF can't get gpu resource"); KNOWHERE_THROW_MSG("Add IVF can't get gpu resource");
} }
} }
...@@ -264,108 +265,6 @@ VectorIndexPtr GPUIVFSQ::CopyGpuToCpu(const Config &config) { ...@@ -264,108 +265,6 @@ VectorIndexPtr GPUIVFSQ::CopyGpuToCpu(const Config &config) {
return std::make_shared<IVFSQ>(new_index); return std::make_shared<IVFSQ>(new_index);
} }
FaissGpuResourceMgr &FaissGpuResourceMgr::GetInstance() {
static FaissGpuResourceMgr instance;
return instance;
}
void FaissGpuResourceMgr::AllocateTempMem(ResPtr &resource,
const int64_t &device_id,
const int64_t &size) {
if (size) {
resource->faiss_res->setTempMemory(size);
}
else {
auto search = devices_params_.find(device_id);
if (search != devices_params_.end()) {
resource->faiss_res->setTempMemory(search->second.temp_mem_size);
}
// else do nothing. allocate when use.
}
}
void FaissGpuResourceMgr::InitDevice(int64_t device_id,
int64_t pin_mem_size,
int64_t temp_mem_size,
int64_t res_num) {
DeviceParams params;
params.pinned_mem_size = pin_mem_size;
params.temp_mem_size = temp_mem_size;
params.resource_num = res_num;
devices_params_.emplace(device_id, params);
}
void FaissGpuResourceMgr::InitResource() {
if(is_init) return ;
is_init = true;
//std::cout << "InitResource" << std::endl;
for(auto& device : devices_params_) {
auto& device_id = device.first;
mutex_cache_.emplace(device_id, std::make_unique<std::mutex>());
//std::cout << "Device Id: " << device_id << std::endl;
auto& device_param = device.second;
auto& bq = idle_map_[device_id];
for (int64_t i = 0; i < device_param.resource_num; ++i) {
//std::cout << "Resource Id: " << i << std::endl;
auto raw_resource = std::make_shared<faiss::gpu::StandardGpuResources>();
// TODO(linxj): enable set pinned memory
auto res_wrapper = std::make_shared<Resource>(raw_resource);
AllocateTempMem(res_wrapper, device_id, 0);
bq.Put(res_wrapper);
}
}
//std::cout << "End initResource" << std::endl;
}
ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
const int64_t &alloc_size) {
InitResource();
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
auto&& resource = bq.Take();
AllocateTempMem(resource, device_id, alloc_size);
return resource;
}
return nullptr;
}
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
bq.Put(res);
}
}
void FaissGpuResourceMgr::Free() {
for (auto &item : idle_map_) {
auto& bq = item.second;
while (!bq.Empty()) {
bq.Take();
}
}
is_init = false;
}
void
FaissGpuResourceMgr::Dump() {
for (auto &item : idle_map_) {
auto& bq = item.second;
std::cout << "device_id: " << item.first
<< ", resource count:" << bq.Size();
}
}
void GPUIndex::SetGpuDevice(const int &gpu_id) { void GPUIndex::SetGpuDevice(const int &gpu_id) {
gpu_id_ = gpu_id; gpu_id_ = gpu_id;
} }
......
...@@ -18,118 +18,18 @@ ...@@ -18,118 +18,18 @@
#pragma once #pragma once
#include <faiss/gpu/StandardGpuResources.h>
#include "ivf.h" #include "ivf.h"
#include "src/utils/BlockingQueue.h" #include "FaissGpuResourceMgr.h"
namespace zilliz { namespace zilliz {
namespace knowhere { namespace knowhere {
struct Resource {
explicit Resource(std::shared_ptr<faiss::gpu::StandardGpuResources> &r): faiss_res(r) {
static int64_t global_id = 0;
id = global_id++;
}
std::shared_ptr<faiss::gpu::StandardGpuResources> faiss_res;
int64_t id;
std::mutex mutex;
};
using ResPtr = std::shared_ptr<Resource>;
using ResWPtr = std::weak_ptr<Resource>;
class FaissGpuResourceMgr {
public:
friend class ResScope;
public:
using ResBQ = zilliz::milvus::server::BlockingQueue<ResPtr>;
struct DeviceParams {
int64_t temp_mem_size = 0;
int64_t pinned_mem_size = 0;
int64_t resource_num = 2;
};
public:
static FaissGpuResourceMgr &
GetInstance();
// Free gpu resource, avoid cudaGetDevice error when deallocate.
// this func should be invoke before main return
void
Free();
void
AllocateTempMem(ResPtr &resource, const int64_t& device_id, const int64_t& size);
void
InitDevice(int64_t device_id,
int64_t pin_mem_size = 0,
int64_t temp_mem_size = 0,
int64_t res_num = 2);
void
InitResource();
// allocate gpu memory invoke by build or copy_to_gpu
ResPtr
GetRes(const int64_t &device_id, const int64_t& alloc_size = 0);
// allocate gpu memory before search
// this func will return True if the device is idle and exists an idle resource.
//bool
//GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0);
void
MoveToIdle(const int64_t &device_id, const ResPtr& res);
void
Dump();
protected:
bool is_init = false;
std::map<int64_t ,std::unique_ptr<std::mutex>> mutex_cache_;
std::map<int64_t, DeviceParams> devices_params_;
std::map<int64_t, ResBQ> idle_map_;
};
class ResScope {
public:
ResScope(ResPtr &res, const int64_t& device_id, const bool& isown)
: resource(res), device_id(device_id), move(true), own(isown) {
if (isown) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
res->mutex.lock();
}
// specif for search
// get the ownership of gpuresource and gpu
ResScope(ResPtr &res, const int64_t &device_id)
: resource(res), device_id(device_id), move(false), own(true) {
FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
res->mutex.lock();
}
~ResScope() {
if (own) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->unlock();
if (move) FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
resource->mutex.unlock();
}
private:
ResPtr resource;
int64_t device_id;
bool move = true;
bool own = false;
};
class GPUIndex { class GPUIndex {
public: public:
explicit GPUIndex(const int &device_id) : gpu_id_(device_id) {} explicit GPUIndex(const int &device_id) : gpu_id_(device_id) {}
GPUIndex(const int& device_id, ResPtr resource): gpu_id_(device_id), res_(std::move(resource)){} GPUIndex(const int& device_id, const ResPtr& resource): gpu_id_(device_id), res_(resource){}
virtual VectorIndexPtr CopyGpuToCpu(const Config &config) = 0; virtual VectorIndexPtr CopyGpuToCpu(const Config &config) = 0;
virtual VectorIndexPtr CopyGpuToGpu(const int64_t &device_id, const Config &config) = 0; virtual VectorIndexPtr CopyGpuToGpu(const int64_t &device_id, const Config &config) = 0;
...@@ -139,7 +39,7 @@ class GPUIndex { ...@@ -139,7 +39,7 @@ class GPUIndex {
protected: protected:
int64_t gpu_id_; int64_t gpu_id_;
ResPtr res_ = nullptr; ResWPtr res_;
}; };
class GPUIVF : public IVF, public GPUIndex { class GPUIVF : public IVF, public GPUIndex {
......
...@@ -224,9 +224,9 @@ void GPUIDMAP::LoadImpl(const BinarySet &index_binary) { ...@@ -224,9 +224,9 @@ void GPUIDMAP::LoadImpl(const BinarySet &index_binary) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_) ){ if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_) ){
ResScope rs(res, gpu_id_, false); ResScope rs(res, gpu_id_, false);
res_ = res;
auto device_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index); auto device_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index);
index_.reset(device_index); index_.reset(device_index);
res_ = res;
} else { } else {
KNOWHERE_THROW_MSG("Load error, can't get gpu resource"); KNOWHERE_THROW_MSG("Load error, can't get gpu resource");
} }
......
...@@ -32,6 +32,7 @@ set(ivf_srcs ...@@ -32,6 +32,7 @@ set(ivf_srcs
${CORE_SOURCE_DIR}/knowhere/knowhere/adapter/structure.cpp ${CORE_SOURCE_DIR}/knowhere/knowhere/adapter/structure.cpp
${CORE_SOURCE_DIR}/knowhere/knowhere/common/exception.cpp ${CORE_SOURCE_DIR}/knowhere/knowhere/common/exception.cpp
${CORE_SOURCE_DIR}/knowhere/knowhere/common/timer.cpp ${CORE_SOURCE_DIR}/knowhere/knowhere/common/timer.cpp
${CORE_SOURCE_DIR}/knowhere/knowhere/index/vector_index/FaissGpuResourceMgr.cpp
utils.cpp utils.cpp
) )
if(NOT TARGET test_ivf) if(NOT TARGET test_ivf)
...@@ -48,6 +49,7 @@ set(idmap_srcs ...@@ -48,6 +49,7 @@ set(idmap_srcs
${CORE_SOURCE_DIR}/knowhere/knowhere/adapter/structure.cpp ${CORE_SOURCE_DIR}/knowhere/knowhere/adapter/structure.cpp
${CORE_SOURCE_DIR}/knowhere/knowhere/common/exception.cpp ${CORE_SOURCE_DIR}/knowhere/knowhere/common/exception.cpp
${CORE_SOURCE_DIR}/knowhere/knowhere/common/timer.cpp ${CORE_SOURCE_DIR}/knowhere/knowhere/common/timer.cpp
${CORE_SOURCE_DIR}/knowhere/knowhere/index/vector_index/FaissGpuResourceMgr.cpp
utils.cpp utils.cpp
) )
if(NOT TARGET test_idmap) if(NOT TARGET test_idmap)
......
...@@ -15,19 +15,20 @@ ...@@ -15,19 +15,20 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "server/Server.h"
#include "version.h"
#include <getopt.h> #include <getopt.h>
#include <libgen.h> #include <libgen.h>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <signal.h> #include <signal.h>
#include "utils/easylogging++.h" #include <unistd.h>
#include "metrics/Metrics.h"
#include "utils/easylogging++.h"
#include "utils/SignalUtil.h" #include "utils/SignalUtil.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
#include "metrics/Metrics.h"
#include "server/Server.h"
#include "version.h"
INITIALIZE_EASYLOGGINGPP INITIALIZE_EASYLOGGINGPP
...@@ -40,12 +41,6 @@ main(int argc, char *argv[]) { ...@@ -40,12 +41,6 @@ main(int argc, char *argv[]) {
std::cout << std::endl << "Welcome to use Milvus by Zilliz!" << std::endl; std::cout << std::endl << "Welcome to use Milvus by Zilliz!" << std::endl;
std::cout << "Milvus " << BUILD_TYPE << " version: v" << MILVUS_VERSION << " built at " << BUILD_TIME << std::endl; std::cout << "Milvus " << BUILD_TYPE << " version: v" << MILVUS_VERSION << " built at " << BUILD_TIME << std::endl;
signal(SIGINT, server::SignalUtil::HandleSignal);
signal(SIGSEGV, server::SignalUtil::HandleSignal);
signal(SIGUSR1, server::SignalUtil::HandleSignal);
signal(SIGUSR2, server::SignalUtil::HandleSignal);
std::string app_name = basename(argv[0]);
static struct option long_options[] = {{"conf_file", required_argument, 0, 'c'}, static struct option long_options[] = {{"conf_file", required_argument, 0, 'c'},
{"log_conf_file", required_argument, 0, 'l'}, {"log_conf_file", required_argument, 0, 'l'},
{"help", no_argument, 0, 'h'}, {"help", no_argument, 0, 'h'},
...@@ -55,14 +50,12 @@ main(int argc, char *argv[]) { ...@@ -55,14 +50,12 @@ main(int argc, char *argv[]) {
int option_index = 0; int option_index = 0;
int64_t start_daemonized = 0; int64_t start_daemonized = 0;
// int pid_fd;
std::string config_filename, log_config_file; std::string config_filename, log_config_file;
std::string pid_filename; std::string pid_filename;
std::string app_name = argv[0];
app_name = argv[0]; if (argc < 2) {
if(argc < 2) {
print_help(app_name); print_help(app_name);
std::cout << "Milvus server exit..." << std::endl; std::cout << "Milvus server exit..." << std::endl;
return EXIT_FAILURE; return EXIT_FAILURE;
...@@ -109,14 +102,27 @@ main(int argc, char *argv[]) { ...@@ -109,14 +102,27 @@ main(int argc, char *argv[]) {
} }
} }
server::Server& server = server::Server::Instance(); server::Server &server = server::Server::Instance();
server.Init(start_daemonized, pid_filename, config_filename, log_config_file); server.Init(start_daemonized, pid_filename, config_filename, log_config_file);
return server.Start(); server.Start();
/* Handle Signal */
signal(SIGHUP, server::SignalUtil::HandleSignal);
signal(SIGINT, server::SignalUtil::HandleSignal);
signal(SIGUSR1, server::SignalUtil::HandleSignal);
signal(SIGSEGV, server::SignalUtil::HandleSignal);
signal(SIGUSR2, server::SignalUtil::HandleSignal);
signal(SIGTERM, server::SignalUtil::HandleSignal);
/* wait signal */
pause();
return 0;
} }
void void
print_help(const std::string &app_name) { print_help(const std::string &app_name) {
std::cout << std::endl<< "Usage: " << app_name << " [OPTIONS]" << std::endl << std::endl; std::cout << std::endl << "Usage: " << app_name << " [OPTIONS]" << std::endl << std::endl;
std::cout << " Options:" << std::endl; std::cout << " Options:" << std::endl;
std::cout << " -h --help Print this help" << std::endl; std::cout << " -h --help Print this help" << std::endl;
std::cout << " -c --conf_file filename Read configuration from the file" << std::endl; std::cout << " -c --conf_file filename Read configuration from the file" << std::endl;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include <thread> #include <thread>
#include "Server.h" #include "Server.h"
#include "server/grpc_impl/GrpcMilvusServer.h" #include "server/grpc_impl/GrpcServer.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/LogUtil.h" #include "utils/LogUtil.h"
#include "utils/SignalUtil.h" #include "utils/SignalUtil.h"
...@@ -42,7 +42,7 @@ namespace zilliz { ...@@ -42,7 +42,7 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
Server& Server &
Server::Instance() { Server::Instance() {
static Server server; static Server server;
return server; return server;
...@@ -154,71 +154,61 @@ Server::Daemonize() { ...@@ -154,71 +154,61 @@ Server::Daemonize() {
} }
} }
int void
Server::Start() { Server::Start() {
if (daemonized_) { if (daemonized_) {
Daemonize(); Daemonize();
} }
do { try {
try { /* Read config file */
// Read config file if (LoadConfig() != SERVER_SUCCESS) {
if (LoadConfig() != SERVER_SUCCESS) { std::cerr << "Milvus server fail to load config file" << std::endl;
return 1; return;
} }
//log path is defined by LoadConfig, so InitLog must be called after LoadConfig /* log path is defined in Config file, so InitLog must be called after LoadConfig */
ServerConfig &config = ServerConfig::GetInstance(); ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER); ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
std::string time_zone = server_config.GetValue(CONFIG_TIME_ZONE, "UTC+8"); std::string time_zone = server_config.GetValue(CONFIG_TIME_ZONE, "UTC+8");
if (time_zone.length() == 3) { if (time_zone.length() == 3) {
time_zone = "CUT";
} else {
int time_bias = std::stoi(time_zone.substr(3, std::string::npos));
if (time_bias == 0)
time_zone = "CUT"; time_zone = "CUT";
else if (time_bias > 0) {
time_zone = "CUT" + std::to_string(-time_bias);
} else { } else {
int time_bias = std::stoi(time_zone.substr(3, std::string::npos)); time_zone = "CUT+" + std::to_string(-time_bias);
if (time_bias == 0)
time_zone = "CUT";
else if (time_bias > 0) {
time_zone = "CUT" + std::to_string(-time_bias);
} else {
time_zone = "CUT+" + std::to_string(-time_bias);
}
}
if (setenv("TZ", time_zone.c_str(), 1) != 0) {
return -1;
} }
tzset(); }
InitLog(log_config_file_); if (setenv("TZ", time_zone.c_str(), 1) != 0) {
std::cerr << "Fail to setenv" << std::endl;
return;
}
tzset();
// Handle Signal InitLog(log_config_file_);
signal(SIGINT, SignalUtil::HandleSignal);
signal(SIGHUP, SignalUtil::HandleSignal);
signal(SIGTERM, SignalUtil::HandleSignal);
server::Metrics::GetInstance().Init();
server::SystemInfo::GetInstance().Init();
std::cout << "Milvus server start successfully." << std::endl; server::Metrics::GetInstance().Init();
StartService(); server::SystemInfo::GetInstance().Init();
} catch (std::exception &ex) { std::cout << "Milvus server start successfully." << std::endl;
std::cerr << "Milvus server encounter exception: " << std::string(ex.what()) StartService();
<< "Is another server instance running?";
break;
}
} while (false);
Stop(); } catch (std::exception &ex) {
return 0; std::cerr << "Milvus server encounter exception: " << ex.what();
}
} }
void void
Server::Stop() { Server::Stop() {
std::cerr << "Milvus server is going to shutdown ..." << std::endl; std::cerr << "Milvus server is going to shutdown ..." << std::endl;
// Unlock and close lockfile /* Unlock and close lockfile */
if (pid_fd != -1) { if (pid_fd != -1) {
int ret = lockf(pid_fd, F_ULOCK, 0); int ret = lockf(pid_fd, F_ULOCK, 0);
if (ret != 0) { if (ret != 0) {
...@@ -232,7 +222,7 @@ Server::Stop() { ...@@ -232,7 +222,7 @@ Server::Stop() {
} }
} }
// Try to delete lockfile /* delete lockfile */
if (!pid_filename_.empty()) { if (!pid_filename_.empty()) {
int ret = unlink(pid_filename_.c_str()); int ret = unlink(pid_filename_.c_str());
if (ret != 0) { if (ret != 0) {
...@@ -264,12 +254,12 @@ Server::StartService() { ...@@ -264,12 +254,12 @@ Server::StartService() {
engine::KnowhereResource::Initialize(); engine::KnowhereResource::Initialize();
engine::StartSchedulerService(); engine::StartSchedulerService();
DBWrapper::GetInstance().StartService(); DBWrapper::GetInstance().StartService();
grpc::GrpcMilvusServer::StartService(); grpc::GrpcServer::GetInstance().Start();
} }
void void
Server::StopService() { Server::StopService() {
grpc::GrpcMilvusServer::StopService(); grpc::GrpcServer::GetInstance().Stop();
DBWrapper::GetInstance().StopService(); DBWrapper::GetInstance().StopService();
engine::StopSchedulerService(); engine::StopSchedulerService();
engine::KnowhereResource::Finalize(); engine::KnowhereResource::Finalize();
......
...@@ -22,24 +22,25 @@ ...@@ -22,24 +22,25 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
class Server { class Server {
public: public:
static Server &Instance(); static Server &Instance();
void Init(int64_t daemonized, const std::string &pid_filename, const std::string &config_filename, void Init(int64_t daemonized,
const std::string &pid_filename,
const std::string &config_filename,
const std::string &log_config_file); const std::string &log_config_file);
int Start(); void Start();
void Stop(); void Stop();
private: private:
Server(); Server();
~Server(); ~Server();
void Daemonize(); void Daemonize();
...@@ -47,10 +48,9 @@ private: ...@@ -47,10 +48,9 @@ private:
ErrorCode LoadConfig(); ErrorCode LoadConfig();
void StartService(); void StartService();
void StopService(); void StopService();
private: private:
int64_t daemonized_ = 0; int64_t daemonized_ = 0;
int pid_fd = -1; int pid_fd = -1;
std::string pid_filename_; std::string pid_filename_;
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include "utils/ValidationUtil.h" #include "utils/ValidationUtil.h"
#include "../DBWrapper.h" #include "../DBWrapper.h"
#include "version.h" #include "version.h"
#include "GrpcMilvusServer.h" #include "GrpcServer.h"
#include "db/Utils.h" #include "db/Utils.h"
#include "scheduler/SchedInst.h" #include "scheduler/SchedInst.h"
//#include <gperftools/profiler.h> //#include <gperftools/profiler.h>
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
// under the License. // under the License.
#include "milvus.grpc.pb.h" #include "milvus.grpc.pb.h"
#include "GrpcMilvusServer.h" #include "GrpcServer.h"
#include "server/ServerConfig.h" #include "server/ServerConfig.h"
#include "server/DBWrapper.h" #include "server/DBWrapper.h"
#include "utils/Log.h" #include "utils/Log.h"
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
#include <grpcpp/grpcpp.h>
namespace zilliz { namespace zilliz {
...@@ -42,7 +41,6 @@ namespace milvus { ...@@ -42,7 +41,6 @@ namespace milvus {
namespace server { namespace server {
namespace grpc { namespace grpc {
static std::unique_ptr<::grpc::Server> server;
constexpr long MESSAGE_SIZE = -1; constexpr long MESSAGE_SIZE = -1;
...@@ -53,18 +51,28 @@ class NoReusePortOption : public ::grpc::ServerBuilderOption { ...@@ -53,18 +51,28 @@ class NoReusePortOption : public ::grpc::ServerBuilderOption {
args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0); args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
} }
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>> * void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>> *plugins) override {
plugins) override {}
}
}; };
Status void
GrpcMilvusServer::StartService() { GrpcServer::Start() {
if (server != nullptr) { thread_ptr_ = std::make_shared<std::thread>(&GrpcServer::StartService, this);
std::cout << "stop service!\n"; }
StopService();
void
GrpcServer::Stop() {
StopService();
if (thread_ptr_) {
thread_ptr_->join();
thread_ptr_ = nullptr;
} }
}
Status
GrpcServer::StartService() {
ServerConfig &config = ServerConfig::GetInstance(); ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER); ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE); ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
...@@ -87,16 +95,16 @@ GrpcMilvusServer::StartService() { ...@@ -87,16 +95,16 @@ GrpcMilvusServer::StartService() {
builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials()); builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials());
builder.RegisterService(&service); builder.RegisterService(&service);
server = builder.BuildAndStart(); server_ptr_ = builder.BuildAndStart();
server->Wait(); server_ptr_->Wait();
return Status::OK(); return Status::OK();
} }
Status Status
GrpcMilvusServer::StopService() { GrpcServer::StopService() {
if (server != nullptr) { if (server_ptr_ != nullptr) {
server->Shutdown(); server_ptr_->Shutdown();
} }
return Status::OK(); return Status::OK();
......
...@@ -21,19 +21,35 @@ ...@@ -21,19 +21,35 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include <thread>
#include <grpcpp/grpcpp.h>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
namespace grpc { namespace grpc {
class GrpcMilvusServer { class GrpcServer {
public: public:
static Status static GrpcServer &GetInstance() {
StartService(); static GrpcServer grpc_server;
return grpc_server;
}
void Start();
void Stop();
private:
GrpcServer() = default;
~GrpcServer() = default;
Status StartService();
Status StopService();
static Status private:
StopService(); std::unique_ptr<::grpc::Server> server_ptr_;
std::shared_ptr<std::thread> thread_ptr_;
}; };
} }
......
...@@ -22,28 +22,29 @@ ...@@ -22,28 +22,29 @@
#include <signal.h> #include <signal.h>
#include <execinfo.h> #include <execinfo.h>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
void SignalUtil::HandleSignal(int signum){ void SignalUtil::HandleSignal(int signum) {
switch(signum){ switch (signum) {
case SIGINT: case SIGINT:
case SIGUSR2:{ case SIGUSR2: {
SERVER_LOG_INFO << "Server received signal:" << std::to_string(signum); SERVER_LOG_INFO << "Server received signal: " << signum;
server::Server& server_ptr = server::Server::Instance(); server::Server &server = server::Server::Instance();
server_ptr.Stop(); server.Stop();
exit(0); exit(0);
} }
default:{ default: {
SERVER_LOG_INFO << "Server received critical signal:" << std::to_string(signum); SERVER_LOG_INFO << "Server received critical signal: " << signum;
SignalUtil::PrintStacktrace(); SignalUtil::PrintStacktrace();
server::Server& server_ptr = server::Server::Instance(); server::Server &server = server::Server::Instance();
server_ptr.Stop(); server.Stop();
exit(1); exit(1);
} }
...@@ -54,9 +55,9 @@ void SignalUtil::PrintStacktrace() { ...@@ -54,9 +55,9 @@ void SignalUtil::PrintStacktrace() {
SERVER_LOG_INFO << "Call stack:"; SERVER_LOG_INFO << "Call stack:";
const int size = 32; const int size = 32;
void* array[size]; void *array[size];
int stack_num = backtrace(array, size); int stack_num = backtrace(array, size);
char ** stacktrace = backtrace_symbols(array, stack_num); char **stacktrace = backtrace_symbols(array, stack_num);
for (int i = 0; i < stack_num; ++i) { for (int i = 0; i < stack_num; ++i) {
std::string info = stacktrace[i]; std::string info = stacktrace[i];
SERVER_LOG_INFO << info; SERVER_LOG_INFO << info;
...@@ -64,7 +65,6 @@ void SignalUtil::PrintStacktrace() { ...@@ -64,7 +65,6 @@ void SignalUtil::PrintStacktrace() {
free(stacktrace); free(stacktrace);
} }
} }
} }
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "KnowhereResource.h" #include "KnowhereResource.h"
#include "knowhere/index/vector_index/FaissGpuResourceMgr.h"
#include "server/ServerConfig.h" #include "server/ServerConfig.h"
#include <map> #include <map>
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#pragma once #pragma once
#include "utils/Error.h" #include "utils/Error.h"
#include "knowhere/index/vector_index/gpu_ivf.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册