提交 4574f9d9 编写于 作者: J JinHai-CN

Merge branch '0.7.1'

......@@ -13,7 +13,7 @@ Please mark all change in change log and use the issue from GitHub
## Task
# Milvus 0.7.1 (TBD)
# Milvus 0.7.1 (2020-03-29)
## Bug
- \#1301 Data in WAL may be accidentally inserted into a new table with the same name.
......@@ -32,6 +32,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1735 Fix search out of memory with ivf_flat
- \#1747 Expected error status if search with partition_tag not existed
- \#1756 Fix memory exhausted during searching
- \#1781 Fix search hang with SQ8H
## Feature
- \#261 Integrate ANNOY into Milvus
......@@ -174,7 +175,7 @@ Please mark all change in change log and use the issue from GitHub
# Milvus 0.6.0 (2019-12-07)
## Bug
- \#228 memory usage increased slowly during searching vectors
- \#228 Memory usage increased slowly during searching vectors
- \#246 Exclude src/external folder from code coverage for jenkin ci
- \#248 Reside src/external in thirdparty
- \#316 Some files not merged after vectors added
......@@ -201,7 +202,7 @@ Please mark all change in change log and use the issue from GitHub
- \#523 Erase file data from cache once the file is marked as deleted
- \#527 faiss benchmark not compatible with faiss 1.6.0
- \#530 BuildIndex stop when do build index and search simultaneously
- \#532 assigin value to `table_name` from confest shell
- \#532 Assigin value to `table_name` from confest shell
- \#533 NSG build failed with MetricType Inner Product
- \#543 client raise exception in shards when search results is empty
- \#545 Avoid dead circle of build index thread when error occurs
......
......@@ -90,7 +90,7 @@ if (MILVUS_VERSION_MAJOR STREQUAL ""
OR MILVUS_VERSION_MINOR STREQUAL ""
OR MILVUS_VERSION_PATCH STREQUAL "")
message(WARNING "Failed to determine Milvus version from git branch name")
set(MILVUS_VERSION "0.7.0")
set(MILVUS_VERSION "0.7.1")
endif ()
message(STATUS "Build version = ${MILVUS_VERSION}")
......
......@@ -69,6 +69,9 @@ class Cache {
void
erase(const std::string& key);
bool
reserve(const int64_t size);
void
print();
......@@ -77,7 +80,13 @@ class Cache {
private:
void
free_memory();
insert_internal(const std::string& key, const ItemObj& item);
void
erase_internal(const std::string& key);
void
free_memory_internal(const int64_t target_size);
private:
std::string header_;
......
......@@ -13,8 +13,6 @@ namespace milvus {
namespace cache {
constexpr double DEFAULT_THRESHOLD_PERCENT = 0.7;
constexpr double WARNING_THRESHOLD_PERCENT = 0.9;
constexpr double BIG_ITEM_THRESHOLD_PERCENT = 0.1;
template <typename ItemObj>
Cache<ItemObj>::Cache(int64_t capacity, int64_t cache_max_count, const std::string& header)
......@@ -28,9 +26,10 @@ Cache<ItemObj>::Cache(int64_t capacity, int64_t cache_max_count, const std::stri
template <typename ItemObj>
void
Cache<ItemObj>::set_capacity(int64_t capacity) {
std::lock_guard<std::mutex> lock(mutex_);
if (capacity > 0) {
capacity_ = capacity;
free_memory();
free_memory_internal(capacity);
}
}
......@@ -55,56 +54,95 @@ Cache<ItemObj>::get(const std::string& key) {
if (!lru_.exists(key)) {
return nullptr;
}
return lru_.get(key);
}
template <typename ItemObj>
void
Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
std::lock_guard<std::mutex> lock(mutex_);
insert_internal(key, item);
}
template <typename ItemObj>
void
Cache<ItemObj>::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
erase_internal(key);
}
template <typename ItemObj>
bool
Cache<ItemObj>::reserve(const int64_t item_size) {
std::lock_guard<std::mutex> lock(mutex_);
if (item_size > capacity_) {
SERVER_LOG_ERROR << header_ << " item size " << (item_size >> 20) << "MB too big to insert into cache capacity"
<< (capacity_ >> 20) << "MB";
return false;
}
if (item_size > capacity_ - usage_) {
free_memory_internal(capacity_ - item_size);
}
return true;
}
template <typename ItemObj>
void
Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << header_ << " Clear cache !";
}
template <typename ItemObj>
void
Cache<ItemObj>::print() {
std::lock_guard<std::mutex> lock(mutex_);
size_t cache_count = lru_.size();
// for (auto it = lru_.begin(); it != lru_.end(); ++it) {
// SERVER_LOG_DEBUG << it->first;
// }
SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20)
<< "MB, [capacity] " << (capacity_ >> 20) << "MB";
}
template <typename ItemObj>
void
Cache<ItemObj>::insert_internal(const std::string& key, const ItemObj& item) {
if (item == nullptr) {
return;
}
size_t item_size = item->Size();
// calculate usage
{
std::lock_guard<std::mutex> lock(mutex_);
// if key already exist, subtract old item size
if (lru_.exists(key)) {
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
}
// plus new item size
usage_ += item_size;
// if key already exist, subtract old item size
if (lru_.exists(key)) {
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
}
// plus new item size
usage_ += item_size;
// if usage exceed capacity, free some items
if (usage_ > capacity_ ||
(item_size > (int64_t)(capacity_ * BIG_ITEM_THRESHOLD_PERCENT) &&
usage_ > (int64_t)(capacity_ * WARNING_THRESHOLD_PERCENT))) {
if (usage_ > capacity_) {
SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity "
<< (capacity_ >> 20) << "MB, start free memory";
free_memory();
free_memory_internal(capacity_);
}
// insert new item
{
std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item);
SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache";
SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: "
<< (capacity_ >> 20) << "MB";
}
lru_.put(key, item);
SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache";
SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: "
<< (capacity_ >> 20) << "MB";
}
template <typename ItemObj>
void
Cache<ItemObj>::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
Cache<ItemObj>::erase_internal(const std::string& key) {
if (!lru_.exists(key)) {
return;
}
......@@ -122,22 +160,9 @@ Cache<ItemObj>::erase(const std::string& key) {
template <typename ItemObj>
void
Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << header_ << " Clear cache !";
}
/* free memory space when CACHE occupation exceed its capacity */
template <typename ItemObj>
void
Cache<ItemObj>::free_memory() {
// if (usage_ <= capacity_)
// return;
int64_t threshhold = capacity_ * freemem_percent_;
int64_t delta_size = usage_ - threshhold;
Cache<ItemObj>::free_memory_internal(const int64_t target_size) {
int64_t threshold = std::min((int64_t)(capacity_ * freemem_percent_), target_size);
int64_t delta_size = usage_ - threshold;
if (delta_size <= 0) {
delta_size = 1; // ensure at least one item erased
}
......@@ -145,44 +170,22 @@ Cache<ItemObj>::free_memory() {
std::set<std::string> key_array;
int64_t released_size = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = lru_.rbegin();
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
auto it = lru_.rbegin();
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
key_array.emplace(key);
released_size += obj_ptr->Size();
++it;
}
key_array.emplace(key);
released_size += obj_ptr->Size();
++it;
}
SERVER_LOG_DEBUG << header_ << " To be released memory size: " << (released_size >> 20) << "MB";
for (auto& key : key_array) {
erase(key);
erase_internal(key);
}
}
template <typename ItemObj>
void
Cache<ItemObj>::print() {
size_t cache_count = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
cache_count = lru_.size();
#if 0
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
SERVER_LOG_DEBUG << it->first;
}
#endif
}
SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20)
<< "MB, [capacity] " << (capacity_ >> 20) << "MB";
}
} // namespace cache
} // namespace milvus
......@@ -39,6 +39,9 @@ class CacheMgr {
virtual void
EraseItem(const std::string& key);
virtual bool
Reserve(const int64_t size);
virtual void
PrintInfo();
......@@ -54,14 +57,20 @@ class CacheMgr {
void
SetCapacity(int64_t capacity);
void
Lock();
void
Unlock();
protected:
CacheMgr();
virtual ~CacheMgr();
protected:
using CachePtr = std::shared_ptr<Cache<ItemObj>>;
CachePtr cache_;
std::shared_ptr<Cache<ItemObj>> cache_;
std::mutex mutex_;
};
} // namespace cache
......
......@@ -27,7 +27,6 @@ CacheMgr<ItemObj>::ItemCount() const {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
return (uint64_t)(cache_->size());
}
......@@ -38,7 +37,6 @@ CacheMgr<ItemObj>::ItemExists(const std::string& key) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
}
return cache_->exists(key);
}
......@@ -60,7 +58,6 @@ CacheMgr<ItemObj>::InsertItem(const std::string& key, const ItemObj& data) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->insert(key, data);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
......@@ -72,11 +69,20 @@ CacheMgr<ItemObj>::EraseItem(const std::string& key) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->erase(key);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
template <typename ItemObj>
bool
CacheMgr<ItemObj>::Reserve(const int64_t size) {
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
}
return cache_->reserve(size);
}
template <typename ItemObj>
void
CacheMgr<ItemObj>::PrintInfo() {
......@@ -84,7 +90,6 @@ CacheMgr<ItemObj>::PrintInfo() {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->print();
}
......@@ -95,7 +100,6 @@ CacheMgr<ItemObj>::ClearCache() {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->clear();
}
......@@ -106,7 +110,6 @@ CacheMgr<ItemObj>::CacheUsage() const {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
return cache_->usage();
}
......@@ -117,7 +120,6 @@ CacheMgr<ItemObj>::CacheCapacity() const {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
return cache_->capacity();
}
......@@ -131,5 +133,17 @@ CacheMgr<ItemObj>::SetCapacity(int64_t capacity) {
cache_->set_capacity(capacity);
}
template <typename ItemObj>
void
CacheMgr<ItemObj>::Lock() {
mutex_.lock();
}
template <typename ItemObj>
void
CacheMgr<ItemObj>::Unlock() {
mutex_.unlock();
}
} // namespace cache
} // namespace milvus
......@@ -22,7 +22,7 @@ namespace cache {
#ifdef MILVUS_GPU_VERSION
std::mutex GpuCacheMgr::global_mutex_;
std::unordered_map<int64_t, std::pair<GpuCacheMgrPtr, MutexPtr>> GpuCacheMgr::instance_;
std::unordered_map<int64_t, GpuCacheMgrPtr> GpuCacheMgr::instance_;
namespace {
constexpr int64_t G_BYTE = 1024 * 1024 * 1024;
......@@ -65,33 +65,28 @@ GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr&
}
}
GpuCacheMgrPtr
GpuCacheMgr::GetInstance(int64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(global_mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_[gpu_id] = std::make_pair(std::make_shared<GpuCacheMgr>(gpu_id), std::make_shared<std::mutex>());
}
}
return instance_[gpu_id].first;
bool
GpuCacheMgr::Reserve(const int64_t size) {
return CacheMgr<DataObjPtr>::Reserve(size);
}
MutexPtr
GpuCacheMgr::GetInstanceMutex(int64_t gpu_id) {
GpuCacheMgrPtr
GpuCacheMgr::GetInstance(int64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(global_mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_[gpu_id] = std::make_pair(std::make_shared<GpuCacheMgr>(gpu_id), std::make_shared<std::mutex>());
instance_[gpu_id] = std::make_shared<GpuCacheMgr>(gpu_id);
}
}
return instance_[gpu_id].second;
return instance_[gpu_id];
}
void
GpuCacheMgr::OnGpuCacheCapacityChanged(int64_t capacity) {
for (auto& iter : instance_) {
std::lock_guard<std::mutex> lock(*(iter.second.second));
iter.second.first->SetCapacity(capacity * G_BYTE);
iter.second->Lock();
iter.second->SetCapacity(capacity * G_BYTE);
iter.second->Unlock();
}
}
......
......@@ -39,12 +39,12 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr>, public server::GpuResourceConfi
void
InsertItem(const std::string& key, const DataObjPtr& data);
bool
Reserve(const int64_t size);
static GpuCacheMgrPtr
GetInstance(int64_t gpu_id);
static MutexPtr
GetInstanceMutex(int64_t gpu_id);
protected:
void
OnGpuCacheCapacityChanged(int64_t capacity) override;
......@@ -53,7 +53,7 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr>, public server::GpuResourceConfi
bool gpu_enable_ = true;
int64_t gpu_id_;
static std::mutex global_mutex_;
static std::unordered_map<int64_t, std::pair<GpuCacheMgrPtr, MutexPtr>> instance_;
static std::unordered_map<int64_t, GpuCacheMgrPtr> instance_;
std::string identity_;
};
#endif
......
......@@ -40,7 +40,8 @@ namespace server {
constexpr int64_t GB = 1UL << 30;
static const std::unordered_map<std::string, std::string> milvus_config_version_map({{"0.6.0", "0.1"},
{"0.7.0", "0.2"}});
{"0.7.0", "0.2"},
{"0.7.1", "0.2"}});
/////////////////////////////////////////////////////////////
Config::Config() {
......
......@@ -112,9 +112,6 @@ class ExecutionEngine {
virtual Status
Cache() = 0;
virtual Status
GpuCache(uint64_t gpu_id) = 0;
virtual Status
Init() = 0;
......
......@@ -41,6 +41,7 @@
#include "metrics/Metrics.h"
#include "scheduler/Utils.h"
#include "utils/CommonUtil.h"
#include "utils/Error.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/Status.h"
......@@ -549,13 +550,16 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
try {
/* Index data is copied to GPU first, then added into GPU cache.
* We MUST add a lock here to avoid more than one INDEX are copied to one GPU card at same time,
* which will potentially cause GPU out of memory.
* Add lock here to avoid multiple INDEX are copied to one GPU card at same time.
* And reserve space to avoid GPU out of memory issue.
*/
std::lock_guard<std::mutex> lock(*(cache::GpuCacheMgr::GetInstanceMutex(device_id)));
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " start";
auto gpu_cache_mgr = cache::GpuCacheMgr::GetInstance(device_id);
// gpu_cache_mgr->Lock();
// gpu_cache_mgr->Reserve(index_->Size());
index_ = knowhere::cloner::CopyCpuToGpu(index_, device_id, knowhere::Config());
GpuCache(device_id);
// gpu_cache_mgr->InsertItem(location_, std::static_pointer_cast<cache::DataObj>(index_));
// gpu_cache_mgr->Unlock();
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " finished";
} catch (std::exception& e) {
ENGINE_LOG_ERROR << e.what();
......@@ -572,10 +576,11 @@ ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
#ifdef MILVUS_GPU_VERSION
// the ToIndexData is only a placeholder, cpu-copy-to-gpu action is performed in
if (index_) {
auto gpu_cache_mgr = milvus::cache::GpuCacheMgr::GetInstance(device_id);
gpu_cache_mgr->Lock();
gpu_num_ = device_id;
auto to_index_data = std::make_shared<knowhere::ToIndexData>(index_->Size());
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj);
gpu_cache_mgr->Reserve(index_->Size());
gpu_cache_mgr->Unlock();
}
#endif
return Status::OK();
......@@ -958,18 +963,11 @@ ExecutionEngineImpl::GetVectorByID(const int64_t& id, uint8_t* vector, bool hybr
Status
ExecutionEngineImpl::Cache() {
auto cpu_cache_mgr = milvus::cache::CpuCacheMgr::GetInstance();
cpu_cache_mgr->Lock();
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
return Status::OK();
}
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
#ifdef MILVUS_GPU_VERSION
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
#endif
cpu_cache_mgr->InsertItem(location_, obj);
cpu_cache_mgr->Unlock();
return Status::OK();
}
......
......@@ -86,9 +86,6 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status
Cache() override;
Status
GpuCache(uint64_t gpu_id) override;
Status
Init() override;
......
......@@ -211,7 +211,7 @@ class BaseRequest {
TableNotExistMsg(const std::string& table_name);
protected:
const std::shared_ptr<milvus::server::Context>& context_;
const std::shared_ptr<milvus::server::Context> context_;
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;
......
......@@ -1388,7 +1388,7 @@ $ curl -X GET "http://127.0.0.1:19121/system/version" -H "accept: application/js
##### Response
```json
{"code":0,"message":"OK","reply": "0.7.0" }
{"code":0,"message":"OK","reply": "0.7.1" }
```
### `system/{op}` (PUT)
......
......@@ -203,8 +203,6 @@ TEST_F(EngineTest, ENGINE_IMPL_TEST) {
auto status = engine_ptr->CopyToGpu(0, false);
ASSERT_TRUE(status.ok());
status = engine_ptr->GpuCache(0);
ASSERT_TRUE(status.ok());
status = engine_ptr->CopyToGpu(0, false);
ASSERT_TRUE(status.ok());
......
......@@ -17,7 +17,7 @@
#include "grpc-gen/gen-milvus/milvus.grpc.pb.h"
#define MILVUS_SDK_VERSION "0.7.0";
#define MILVUS_SDK_VERSION "0.7.1";
namespace milvus {
......
import logging
import pytest
__version__ = '0.7.0'
__version__ = '0.7.1'
class TestPing:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册