提交 dc559ef2 编写于 作者: W wxyu

solve conflicts


Former-commit-id: 42e851d305e116053bc8cbf57dffc4161e66bc48
......@@ -7,9 +7,10 @@ Please mark all change in change log and use the ticket from JIRA.
## Bug
## Improvement
- MS-552 - Add and change the easylogging library
- MS-553 - Refine cache code
- MS-556 - Add Job Definition in Scheduler
## New Feature
## Task
......
......@@ -19,7 +19,6 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES
ARROW
BOOST
BZip2
Easylogging++
FAISS
GTest
Knowhere
......@@ -56,8 +55,6 @@ macro(build_dependency DEPENDENCY_NAME)
build_arrow()
elseif("${DEPENDENCY_NAME}" STREQUAL "BZip2")
build_bzip2()
elseif("${DEPENDENCY_NAME}" STREQUAL "Easylogging++")
build_easyloggingpp()
elseif("${DEPENDENCY_NAME}" STREQUAL "FAISS")
build_faiss()
elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest")
......@@ -295,13 +292,6 @@ else()
endif()
set(BZIP2_MD5 "00b516f4704d4a7cb50a1d97e6e8e15b")
if(DEFINED ENV{MILVUS_EASYLOGGINGPP_URL})
set(EASYLOGGINGPP_SOURCE_URL "$ENV{MILVUS_EASYLOGGINGPP_URL}")
else()
set(EASYLOGGINGPP_SOURCE_URL "https://github.com/zuhd-org/easyloggingpp/archive/${EASYLOGGINGPP_VERSION}.tar.gz")
endif()
set(EASYLOGGINGPP_MD5 "b78cd319db4be9b639927657b8aa7732")
if(DEFINED ENV{MILVUS_FAISS_URL})
set(FAISS_SOURCE_URL "$ENV{MILVUS_FAISS_URL}")
else()
......@@ -814,84 +804,6 @@ if(MILVUS_WITH_KNOWHERE)
include_directories(SYSTEM "${KNOWHERE_INCLUDE_DIR}/SPTAG/AnnService")
endif()
# ----------------------------------------------------------------------
# Easylogging++
macro(build_easyloggingpp)
message(STATUS "Building Easylogging++-${EASYLOGGINGPP_VERSION} from source")
set(EASYLOGGINGPP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/easyloggingpp_ep-prefix/src/easyloggingpp_ep")
set(EASYLOGGINGPP_INCLUDE_DIR "${EASYLOGGINGPP_PREFIX}/include")
set(EASYLOGGINGPP_STATIC_LIB
"${EASYLOGGINGPP_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}easyloggingpp${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(EASYLOGGINGPP_CMAKE_ARGS
${EP_COMMON_CMAKE_ARGS}
"-DCMAKE_INSTALL_PREFIX=${EASYLOGGINGPP_PREFIX}"
-DCMAKE_INSTALL_LIBDIR=lib
-Dtest=OFF
-Dbuild_static_lib=ON)
if(USE_JFROG_CACHE STREQUAL "ON")
set(EASYLOGGINGPP_CACHE_PACKAGE_NAME "easyloggingpp_${EASYLOGGINGPP_MD5}.tar.gz")
set(EASYLOGGINGPP_CACHE_URL "${JFROG_ARTFACTORY_CACHE_URL}/${EASYLOGGINGPP_CACHE_PACKAGE_NAME}")
set(EASYLOGGINGPP_CACHE_PACKAGE_PATH "${THIRDPARTY_PACKAGE_CACHE}/${EASYLOGGINGPP_CACHE_PACKAGE_NAME}")
execute_process(COMMAND wget -q --method HEAD ${EASYLOGGINGPP_CACHE_URL} RESULT_VARIABLE return_code)
message(STATUS "Check the remote cache file ${EASYLOGGINGPP_CACHE_URL}. return code = ${return_code}")
if (NOT return_code EQUAL 0)
externalproject_add(easyloggingpp_ep
URL
${EASYLOGGINGPP_SOURCE_URL}
${EP_LOG_OPTIONS}
CMAKE_ARGS
${EASYLOGGINGPP_CMAKE_ARGS}
BUILD_COMMAND
${MAKE}
${MAKE_BUILD_ARGS}
BUILD_BYPRODUCTS
${EASYLOGGINGPP_STATIC_LIB})
ExternalProject_Create_Cache(easyloggingpp_ep ${EASYLOGGINGPP_CACHE_PACKAGE_PATH} "${CMAKE_CURRENT_BINARY_DIR}/easyloggingpp_ep-prefix" ${JFROG_USER_NAME} ${JFROG_PASSWORD} ${EASYLOGGINGPP_CACHE_URL})
else()
file(DOWNLOAD ${EASYLOGGINGPP_CACHE_URL} ${EASYLOGGINGPP_CACHE_PACKAGE_PATH} STATUS status)
list(GET status 0 status_code)
message(STATUS "DOWNLOADING FROM ${EASYLOGGINGPP_CACHE_URL} TO ${EASYLOGGINGPP_CACHE_PACKAGE_PATH}. STATUS = ${status_code}")
if (status_code EQUAL 0)
ExternalProject_Use_Cache(easyloggingpp_ep ${EASYLOGGINGPP_CACHE_PACKAGE_PATH} ${CMAKE_CURRENT_BINARY_DIR})
endif()
endif()
else()
externalproject_add(easyloggingpp_ep
URL
${EASYLOGGINGPP_SOURCE_URL}
${EP_LOG_OPTIONS}
CMAKE_ARGS
${EASYLOGGINGPP_CMAKE_ARGS}
BUILD_COMMAND
${MAKE}
${MAKE_BUILD_ARGS}
BUILD_BYPRODUCTS
${EASYLOGGINGPP_STATIC_LIB})
endif()
file(MAKE_DIRECTORY "${EASYLOGGINGPP_INCLUDE_DIR}")
add_library(easyloggingpp STATIC IMPORTED)
set_target_properties(
easyloggingpp
PROPERTIES IMPORTED_LOCATION "${EASYLOGGINGPP_STATIC_LIB}"
INTERFACE_INCLUDE_DIRECTORIES "${EASYLOGGINGPP_INCLUDE_DIR}")
add_dependencies(easyloggingpp easyloggingpp_ep)
endmacro()
if(MILVUS_WITH_EASYLOGGINGPP)
resolve_dependency(Easylogging++)
get_target_property(EASYLOGGINGPP_INCLUDE_DIR easyloggingpp INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM "${EASYLOGGINGPP_PREFIX}/lib")
include_directories(SYSTEM "${EASYLOGGINGPP_INCLUDE_DIR}")
endif()
# ----------------------------------------------------------------------
# OpenBLAS
......
......@@ -75,6 +75,7 @@ set(db_files
${db_scheduler_files}
${metrics_files}
${knowhere_files}
${utils_files}
)
set(s3_client_files
......@@ -96,7 +97,6 @@ set(client_grpc_lib
set(third_party_libs
knowhere
easyloggingpp
sqlite
${client_grpc_lib}
yaml-cpp
......@@ -147,7 +147,6 @@ target_link_libraries(milvus_engine ${engine_libs} ${third_party_libs})
add_library(metrics STATIC ${metrics_files})
set(metrics_lib
easyloggingpp
yaml-cpp
prometheus-cpp-push
prometheus-cpp-pull
......
......@@ -6,35 +6,20 @@
#pragma once
#include "LRU.h"
#include "utils/Log.h"
#include <string>
#include <mutex>
#include <atomic>
#include "LRU.h"
#include "DataObj.h"
#include <set>
namespace zilliz {
namespace milvus {
namespace cache {
const std::string SWAP_DIR = ".CACHE";
template<typename ItemObj>
class Cache {
private:
class CacheObj {
public:
CacheObj() = delete;
CacheObj(const DataObjPtr& data)
: data_(data) {
}
public:
DataObjPtr data_ = nullptr;
};
using CacheObjPtr = std::shared_ptr<CacheObj>;
public:
//mem_capacity, units:GB
Cache(int64_t capacity_gb, uint64_t cache_max_count);
......@@ -49,11 +34,13 @@ public:
size_t size() const;
bool exists(const std::string& key);
DataObjPtr get(const std::string& key);
void insert(const std::string& key, const DataObjPtr& data);
ItemObj get(const std::string& key);
void insert(const std::string& key, const ItemObj& item);
void erase(const std::string& key);
void print();
void clear();
private:
void free_memory();
private:
......@@ -61,13 +48,12 @@ private:
int64_t capacity_;
double freemem_percent_;
LRU<std::string, CacheObjPtr> lru_;
LRU<std::string, ItemObj> lru_;
mutable std::mutex mutex_;
};
using CachePtr = std::shared_ptr<Cache>;
} // cache
} // milvus
} // zilliz
#include "cache/Cache.inl"
\ No newline at end of file
......@@ -4,10 +4,6 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "Cache.h"
#include "utils/Log.h"
#include <set>
namespace zilliz {
namespace milvus {
......@@ -15,7 +11,8 @@ namespace cache {
constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85;
Cache::Cache(int64_t capacity, uint64_t cache_max_count)
template<typename ItemObj>
Cache<ItemObj>::Cache(int64_t capacity, uint64_t cache_max_count)
: usage_(0),
capacity_(capacity),
freemem_percent_(DEFAULT_THRESHHOLD_PERCENT),
......@@ -23,142 +20,106 @@ Cache::Cache(int64_t capacity, uint64_t cache_max_count)
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
}
void Cache::set_capacity(int64_t capacity) {
template<typename ItemObj>
void Cache<ItemObj>::set_capacity(int64_t capacity) {
if(capacity > 0) {
capacity_ = capacity;
free_memory();
}
}
size_t Cache::size() const {
template<typename ItemObj>
size_t Cache<ItemObj>::size() const {
std::lock_guard<std::mutex> lock(mutex_);
return lru_.size();
}
bool Cache::exists(const std::string& key) {
template<typename ItemObj>
bool Cache<ItemObj>::exists(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
return lru_.exists(key);
}
DataObjPtr Cache::get(const std::string& key) {
template<typename ItemObj>
ItemObj Cache<ItemObj>::get(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if(!lru_.exists(key)){
return nullptr;
}
const CacheObjPtr& cache_obj = lru_.get(key);
return cache_obj->data_;
return lru_.get(key);
}
void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) {
template<typename ItemObj>
void Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
if(item == nullptr) {
return;
}
// if(item->size() > capacity_) {
// SERVER_LOG_ERROR << "Item size " << item->size()
// << " is too large to insert into cache, capacity " << capacity_;
// return;
// }
//calculate usage
{
std::lock_guard<std::mutex> lock(mutex_);
/* if key already exist, over-write old data */
//if key already exist, subtract old item size
if (lru_.exists(key)) {
CacheObjPtr obj_ptr = lru_.get(key);
usage_ -= obj_ptr->data_->size();
obj_ptr->data_ = data_ptr;
usage_ += data_ptr->size();
} else {
CacheObjPtr obj_ptr(new CacheObj(data_ptr));
lru_.put(key, obj_ptr);
usage_ += data_ptr->size();
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->size();
}
SERVER_LOG_DEBUG << "Insert " << key << " size:" << data_ptr->size()
<< " bytes into cache, usage: " << usage_ << " bytes";
//plus new item size
usage_ += item->size();
}
//if usage exceed capacity, free some items
if (usage_ > capacity_) {
SERVER_LOG_DEBUG << "Current usage " << usage_
<< " exceeds cache capacity " << capacity_
<< ", start free memory";
<< " exceeds cache capacity " << capacity_
<< ", start free memory";
free_memory();
}
//insert new item
{
std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item);
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->size()
<< " bytes into cache, usage: " << usage_ << " bytes";
}
}
void Cache::erase(const std::string& key) {
template<typename ItemObj>
void Cache<ItemObj>::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if(!lru_.exists(key)){
return;
}
const CacheObjPtr& obj_ptr = lru_.get(key);
const DataObjPtr& data_ptr = obj_ptr->data_;
usage_ -= data_ptr->size();
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << data_ptr->size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->size();
lru_.erase(key);
}
void Cache::clear() {
template<typename ItemObj>
void Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << "Clear cache !";
}
#if 0 /* caiyd 20190221, need more testing before enable */
void Cache::flush_to_file(const std::string& key, const CacheObjPtr& obj_ptr) {
if (!this->swap_enabled_) return;
const DataObjPtr data_ptr = obj_ptr->data();
if (data_ptr == nullptr || data_ptr->size() == 0) return;
if (data_ptr->ptr() == nullptr) return;
std::string name = std::to_string(reinterpret_cast<int64_t>(data_ptr.get()));
filesys::CreateDirectory(this->swap_path_);
/* write cache data to file */
obj_ptr->set_file_path(this->swap_path_ + "/" + name);
std::shared_ptr<arrow::io::OutputStream> outfile = nullptr;
filesys::OpenWritableFile(obj_ptr->file_path(), false, &outfile);
filesys::WriteFile(outfile, data_ptr->ptr().get(), data_ptr->size());
(void)outfile->Close();
AGENT_LOG_DEBUG << "Flush cache data: " << key << ", to file: " << obj_ptr->file_path();
/* free cache memory */
data_ptr->ptr().reset();
usage_ -= data_ptr->size();
}
void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr) {
if (!this->swap_enabled_) return;
const DataObjPtr data_ptr = obj_ptr->data();
if (data_ptr == nullptr || data_ptr->size() == 0) return;
std::shared_ptr<arrow::io::RandomAccessFile> infile = nullptr;
int64_t file_size, bytes_read;
/* load cache data from file */
if (!filesys::FileExist(obj_ptr->file_path())) {
THROW_AGENT_UNEXPECTED_ERROR("File not exist: " + obj_ptr->file_path());
}
filesys::OpenReadableFile(obj_ptr->file_path(), &infile);
infile->GetSize(&file_size);
if (data_ptr->size() != file_size) {
THROW_AGENT_UNEXPECTED_ERROR("File size not match: " + obj_ptr->file_path());
}
data_ptr->set_ptr(lib::gpu::MakeShared<char>(data_ptr->size(), lib::gpu::MallocHint::kUnifiedGlobal));
infile->Read(file_size, &bytes_read, data_ptr->ptr().get());
infile->Close();
AGENT_LOG_DEBUG << "Restore cache data: " << key << ", from file: " << obj_ptr->file_path();
/* clear file path */
obj_ptr->set_file_path("");
usage_ += data_ptr->size();
}
#endif
/* free memory space when CACHE occupation exceed its capacity */
void Cache::free_memory() {
template<typename ItemObj>
void Cache<ItemObj>::free_memory() {
if (usage_ <= capacity_) return;
int64_t threshhold = capacity_ * freemem_percent_;
......@@ -177,10 +138,9 @@ void Cache::free_memory() {
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
const auto& data_ptr = obj_ptr->data_;
key_array.emplace(key);
released_size += data_ptr->size();
released_size += obj_ptr->size();
++it;
}
}
......@@ -194,7 +154,8 @@ void Cache::free_memory() {
print();
}
void Cache::print() {
template<typename ItemObj>
void Cache<ItemObj>::print() {
size_t cache_count = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
......
......@@ -7,22 +7,23 @@
#pragma once
#include "Cache.h"
#include "utils/Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace cache {
template<typename ItemObj>
class CacheMgr {
public:
virtual uint64_t ItemCount() const;
virtual bool ItemExists(const std::string& key);
virtual DataObjPtr GetItem(const std::string& key);
virtual engine::VecIndexPtr GetIndex(const std::string& key);
virtual ItemObj GetItem(const std::string& key);
virtual void InsertItem(const std::string& key, const DataObjPtr& data);
virtual void InsertItem(const std::string& key, const engine::VecIndexPtr& index);
virtual void InsertItem(const std::string& key, const ItemObj& data);
virtual void EraseItem(const std::string& key);
......@@ -39,6 +40,7 @@ protected:
virtual ~CacheMgr();
protected:
using CachePtr = std::shared_ptr<Cache<ItemObj>>;
CachePtr cache_;
};
......@@ -46,3 +48,5 @@ protected:
}
}
}
#include "cache/CacheMgr.inl"
\ No newline at end of file
......@@ -4,22 +4,21 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "utils/Log.h"
#include "CacheMgr.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace cache {
CacheMgr::CacheMgr() {
template<typename ItemObj>
CacheMgr<ItemObj>::CacheMgr() {
}
CacheMgr::~CacheMgr() {
template<typename ItemObj>
CacheMgr<ItemObj>::~CacheMgr() {
}
uint64_t CacheMgr::ItemCount() const {
template<typename ItemObj>
uint64_t CacheMgr<ItemObj>::ItemCount() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
......@@ -28,7 +27,8 @@ uint64_t CacheMgr::ItemCount() const {
return (uint64_t)(cache_->size());
}
bool CacheMgr::ItemExists(const std::string& key) {
template<typename ItemObj>
bool CacheMgr<ItemObj>::ItemExists(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
......@@ -37,7 +37,8 @@ bool CacheMgr::ItemExists(const std::string& key) {
return cache_->exists(key);
}
DataObjPtr CacheMgr::GetItem(const std::string& key) {
template<typename ItemObj>
ItemObj CacheMgr<ItemObj>::GetItem(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return nullptr;
......@@ -46,16 +47,8 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
return cache_->get(key);
}
engine::VecIndexPtr CacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
}
return nullptr;
}
void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
template<typename ItemObj>
void CacheMgr<ItemObj>::InsertItem(const std::string& key, const ItemObj& data) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -65,18 +58,8 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::VecIndexPtr& index) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
DataObjPtr obj = std::make_shared<DataObj>(index);
cache_->insert(key, obj);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::EraseItem(const std::string& key) {
template<typename ItemObj>
void CacheMgr<ItemObj>::EraseItem(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -86,7 +69,8 @@ void CacheMgr::EraseItem(const std::string& key) {
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::PrintInfo() {
template<typename ItemObj>
void CacheMgr<ItemObj>::PrintInfo() {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -95,7 +79,8 @@ void CacheMgr::PrintInfo() {
cache_->print();
}
void CacheMgr::ClearCache() {
template<typename ItemObj>
void CacheMgr<ItemObj>::ClearCache() {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -104,7 +89,8 @@ void CacheMgr::ClearCache() {
cache_->clear();
}
int64_t CacheMgr::CacheUsage() const {
template<typename ItemObj>
int64_t CacheMgr<ItemObj>::CacheUsage() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
......@@ -113,7 +99,8 @@ int64_t CacheMgr::CacheUsage() const {
return cache_->usage();
}
int64_t CacheMgr::CacheCapacity() const {
template<typename ItemObj>
int64_t CacheMgr<ItemObj>::CacheCapacity() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
......@@ -122,7 +109,8 @@ int64_t CacheMgr::CacheCapacity() const {
return cache_->capacity();
}
void CacheMgr::SetCapacity(int64_t capacity) {
template<typename ItemObj>
void CacheMgr<ItemObj>::SetCapacity(int64_t capacity) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......
......@@ -20,7 +20,7 @@ CpuCacheMgr::CpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
int64_t cap = config.GetInt64Value(server::CONFIG_CPU_CACHE_CAPACITY, 16);
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85);
if(free_percent > 0.0 && free_percent <= 1.0) {
......@@ -31,6 +31,20 @@ CpuCacheMgr::CpuCacheMgr() {
}
}
CpuCacheMgr* CpuCacheMgr::GetInstance() {
static CpuCacheMgr s_mgr;
return &s_mgr;
}
engine::VecIndexPtr CpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
}
return nullptr;
}
}
}
}
\ No newline at end of file
......@@ -6,22 +6,21 @@
#pragma once
#include "CacheMgr.h"
#include "DataObj.h"
namespace zilliz {
namespace milvus {
namespace cache {
class CpuCacheMgr : public CacheMgr {
class CpuCacheMgr : public CacheMgr<DataObjPtr> {
private:
CpuCacheMgr();
public:
//TODO: use smart pointer instead
static CacheMgr* GetInstance() {
static CpuCacheMgr s_mgr;
return &s_mgr;
}
static CpuCacheMgr* GetInstance();
engine::VecIndexPtr GetIndex(const std::string& key);
};
}
......
......@@ -25,7 +25,7 @@ GpuCacheMgr::GpuCacheMgr() {
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 0);
cap *= G_BYTE;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::GPU_CACHE_FREE_PERCENT, 0.85);
if (free_percent > 0.0 && free_percent <= 1.0) {
......@@ -36,7 +36,7 @@ GpuCacheMgr::GpuCacheMgr() {
}
}
CacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) {
GpuCacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
......@@ -49,14 +49,13 @@ CacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) {
}
}
void GpuCacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
//TODO: copy data to gpu
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
engine::VecIndexPtr GpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
}
cache_->insert(key, data);
return nullptr;
}
}
......
......@@ -5,6 +5,8 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include "DataObj.h"
#include <unordered_map>
#include <memory>
......@@ -15,13 +17,13 @@ namespace cache {
class GpuCacheMgr;
using GpuCacheMgrPtr = std::shared_ptr<GpuCacheMgr>;
class GpuCacheMgr : public CacheMgr {
class GpuCacheMgr : public CacheMgr<DataObjPtr> {
public:
GpuCacheMgr();
static CacheMgr* GetInstance(uint64_t gpu_id);
static GpuCacheMgr* GetInstance(uint64_t gpu_id);
void InsertItem(const std::string& key, const DataObjPtr& data) override;
engine::VecIndexPtr GetIndex(const std::string& key);
private:
static std::mutex mutex_;
......
......@@ -5,7 +5,7 @@
******************************************************************************/
#pragma once
#include <easylogging++.h>
#include "utils/easylogging++.h"
namespace zilliz {
namespace milvus {
......
......@@ -5,7 +5,7 @@
******************************************************************************/
#include <stdlib.h>
#include <assert.h>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <boost/algorithm/string.hpp>
#include "Options.h"
......
......@@ -124,7 +124,7 @@ Status ExecutionEngineImpl::Serialize() {
}
Status ExecutionEngineImpl::Load(bool to_cache) {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
try {
......@@ -151,7 +151,7 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
index_ = index;
......@@ -178,7 +178,7 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
}
Status ExecutionEngineImpl::CopyToCpu() {
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
index_ = index;
......@@ -221,7 +221,7 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
}
ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
try {
double physical_size = server::CommonUtil::GetFileSize(location);
......
......@@ -11,7 +11,7 @@
#include <cstring>
#include <string>
#include <signal.h>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include "metrics/Metrics.h"
#include "utils/SignalUtil.h"
......
......@@ -6,7 +6,7 @@
#pragma once
#include "Error.h"
#include <easylogging++.h>
#include "easylogging++.h"
namespace zilliz {
namespace milvus {
......
......@@ -5,8 +5,8 @@
////////////////////////////////////////////////////////////////////////////////
#include "LogUtil.h"
#include "server/ServerConfig.h"
#include "easylogging++.h"
#include <easylogging++.h>
#include <ctype.h>
#include <string>
......@@ -27,7 +27,7 @@ static int fatal_idx = 0;
}
// TODO(yzb) : change the easylogging library to get the log level from parameter rather than filename
void RolloutHandler(const char *filename, std::size_t size) {
void RolloutHandler(const char *filename, std::size_t size, el::Level level) {
char *dirc = strdup(filename);
char *basec = strdup(filename);
char *dir = dirname(dirc);
......@@ -48,22 +48,22 @@ void RolloutHandler(const char *filename, std::size_t size) {
int ret;
std::string m(std::string(dir) + "/" + s);
s = m;
if ((position = s.find("global")) != std::string::npos) {
if (level == el::Level::Global) {
s.append("." + std::to_string(++global_idx));
ret = rename(m.c_str(), s.c_str());
} else if ((position = s.find("debug")) != std::string::npos) {
} else if (level == el::Level::Debug) {
s.append("." + std::to_string(++debug_idx));
ret = rename(m.c_str(), s.c_str());
} else if ((position = s.find("warning")) != std::string::npos) {
} else if (level == el::Level::Warning) {
s.append("." + std::to_string(++warning_idx));
ret = rename(m.c_str(), s.c_str());
} else if ((position = s.find("trace")) != std::string::npos) {
} else if (level == el::Level::Trace) {
s.append("." + std::to_string(++trace_idx));
ret = rename(m.c_str(), s.c_str());
} else if ((position = s.find("error")) != std::string::npos) {
} else if (level == el::Level::Error) {
s.append("." + std::to_string(++error_idx));
ret = rename(m.c_str(), s.c_str());
} else if ((position = s.find("fatal")) != std::string::npos) {
} else if (level == el::Level::Fatal) {
s.append("." + std::to_string(++fatal_idx));
ret = rename(m.c_str(), s.c_str());
} else {
......@@ -113,6 +113,7 @@ int32_t InitLog(const std::string &log_config_file) {
el::Loggers::addFlag(el::LoggingFlag::StrictLogFileSizeCheck);
el::Helpers::installPreRollOutCallback(RolloutHandler);
el::Loggers::addFlag(el::LoggingFlag::DisableApplicationAbortOnFatalLog);
return 0;
}
......
......@@ -7,6 +7,7 @@
#include <string>
#include <sstream>
#include "easylogging++.h"
namespace zilliz {
namespace milvus {
......@@ -16,7 +17,7 @@ inline std::string GetFileName(std::string filename) {
int pos = filename.find_last_of('/');
return filename.substr(pos + 1);
}
void RolloutHandler(const char *filename, std::size_t size);
void RolloutHandler(const char *filename, std::size_t size, el::Level level);
#define SHOW_LOCATION
#ifdef SHOW_LOCATION
......
此差异已折叠。
此差异已折叠。
......@@ -6,7 +6,7 @@
#pragma once
#include <easylogging++.h>
#include "utils/easylogging++.h"
namespace zilliz {
namespace milvus {
......
......@@ -25,7 +25,6 @@ set(unittest_libs
gmock
gtest_main
gmock_main
easyloggingpp
pthread
metrics
gfortran
......
......@@ -19,7 +19,9 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
aux_source_directory(./ test_srcs)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp)
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp
${MILVUS_ENGINE_SRC}/utils/easylogging++.cc
${MILVUS_ENGINE_SRC}/utils/easylogging++.h)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
......
......@@ -12,7 +12,7 @@
#include "utils/CommonUtil.h"
#include <gtest/gtest.h>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <boost/filesystem.hpp>
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <stdlib.h>
#include <time.h>
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <boost/filesystem.hpp>
#include "db/Exception.h"
......
......@@ -9,7 +9,7 @@
#include "db/meta/MetaConsts.h"
#include <gtest/gtest.h>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <boost/filesystem.hpp>
#include <thread>
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <stdlib.h>
#include <time.h>
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <boost/filesystem.hpp>
#include "db/scheduler/TaskScheduler.h"
......
......@@ -7,7 +7,9 @@ set(knowhere_src
${MILVUS_ENGINE_SRC}/wrapper/knowhere/vec_index.cpp)
set(helper
utils.cpp)
utils.cpp
${MILVUS_ENGINE_SRC}/utils/easylogging++.cc
${MILVUS_ENGINE_SRC}/utils/easylogging++.h)
set(knowhere_libs
knowhere
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <wrapper/knowhere/vec_index.h>
#include "knowhere/index/vector_index/gpu_ivf.h"
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include "server/ServerConfig.h"
#include "utils/CommonUtil.h"
......
......@@ -18,7 +18,9 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/src/metrics metrics_src)
aux_source_directory(./ test_srcs)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp)
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp
${MILVUS_ENGINE_SRC}/utils/easylogging++.cc
${MILVUS_ENGINE_SRC}/utils/easylogging++.h)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
......
......@@ -21,7 +21,10 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
aux_source_directory(./ test_srcs)
set(util_files ${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp
${MILVUS_ENGINE_SRC}/utils/easylogging++.cc
${MILVUS_ENGINE_SRC}/utils/easylogging++.h)
set(db_scheduler_srcs
${scheduler_files}
......
......@@ -39,6 +39,11 @@ set(db_scheduler_srcs
${scheduler_task_files}
)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp
${MILVUS_ENGINE_SRC}/utils/easylogging++.cc
${MILVUS_ENGINE_SRC}/utils/easylogging++.h)
set(db_src
${config_files}
${cache_srcs}
......
......@@ -38,44 +38,8 @@ engine_config:
use_blas_threshold: 20
resource_config:
# resource list, length: 0~N
# please set a DISK resource and a CPU resource least, or system will not return query result.
#
# example:
# resource_name: # resource name, just using in connections below
# type: DISK # resource type, optional: DISK/CPU/GPU
# device_id: 0
# enable_executor: false # if is enable executor, optional: true, false
mode: simple
resources:
ssda:
type: DISK
device_id: 0
enable_executor: false
cpu:
type: CPU
device_id: 0
enable_executor: false
gpu0:
type: GPU
device_id: 0
enable_executor: true
gpu_resource_num: 2
pinned_memory: 300
temp_memory: 300
# connection list, length: 0~N
# example:
# connection_name:
# speed: 100 # unit: MS/s
# endpoint: ===
connections:
io:
speed: 500
endpoint: ssda===cpu
pcie0:
speed: 11000
endpoint: cpu===gpu0
# - cpu
- gpu0
......@@ -15,21 +15,27 @@ using namespace zilliz::milvus;
namespace {
class InvalidCacheMgr : public cache::CacheMgr {
class InvalidCacheMgr : public cache::CacheMgr<cache::DataObjPtr> {
public:
InvalidCacheMgr() {
}
};
class LessItemCacheMgr : public cache::CacheMgr {
class LessItemCacheMgr : public cache::CacheMgr<cache::DataObjPtr> {
public:
LessItemCacheMgr() {
cache_ = std::make_shared<cache::Cache>(1UL << 12, 10);
cache_ = std::make_shared<cache::Cache<cache::DataObjPtr>>(1UL << 12, 10);
}
};
class MockVecIndex : public engine::VecIndex {
public:
MockVecIndex(int64_t dim, int64_t total)
: dimension_(dim),
ntotal_(total){
}
virtual ErrorCode BuildAll(const long &nb,
const float *xb,
const long *ids,
......@@ -93,7 +99,7 @@ public:
}
public:
int64_t dimension_ = 512;
int64_t dimension_ = 256;
int64_t ntotal_ = 0;
};
......@@ -101,7 +107,7 @@ public:
TEST(CacheTest, DUMMY_TEST) {
engine::Config cfg;
MockVecIndex mock_index;
MockVecIndex mock_index(256, 1000);
mock_index.Dimension();
mock_index.Count();
mock_index.Add(1, nullptr, nullptr);
......@@ -118,29 +124,27 @@ TEST(CacheTest, DUMMY_TEST) {
}
TEST(CacheTest, CPU_CACHE_TEST) {
cache::CacheMgr *cpu_mgr = cache::CpuCacheMgr::GetInstance();
auto cpu_mgr = cache::CpuCacheMgr::GetInstance();
const int64_t gbyte = 1 << 30;
const int64_t gbyte = 1024*1024*1024;
int64_t g_num = 16;
int64_t cap = g_num * gbyte;
cpu_mgr->SetCapacity(cap);
ASSERT_EQ(cpu_mgr->CacheCapacity(), cap);
const int dim = 256;
for (int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000000;//less 1G per index
engine::VecIndexPtr index(mock_index);
cpu_mgr->InsertItem("index_" + std::to_string(i), index);
uint64_t item_count = 20;
for (uint64_t i = 0; i < item_count; i++) {
//each vector is 1k byte, total size less than 1G
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
ASSERT_LT(cpu_mgr->ItemCount(), g_num);
auto obj = cpu_mgr->GetIndex("index_0");
ASSERT_TRUE(obj == nullptr);
obj = cpu_mgr->GetIndex("index_19");
obj = cpu_mgr->GetIndex("index_" + std::to_string(item_count - 1));
ASSERT_TRUE(obj != nullptr);
{
......@@ -154,30 +158,24 @@ TEST(CacheTest, CPU_CACHE_TEST) {
g_num = 5;
cpu_mgr->SetCapacity(g_num * gbyte);
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 6000000;//6G less
engine::VecIndexPtr index(mock_index);
cpu_mgr->InsertItem("index_6g", index);
ASSERT_EQ(cpu_mgr->ItemCount(), 0);//data greater than capacity can not be inserted sucessfully
//each vector is 1k byte, total size less than 6G
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 6000000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
cpu_mgr->InsertItem("index_6g", data_obj);
ASSERT_TRUE(cpu_mgr->ItemExists("index_6g"));
}
cpu_mgr->PrintInfo();
}
TEST(CacheTest, GPU_CACHE_TEST) {
cache::CacheMgr* gpu_mgr = cache::GpuCacheMgr::GetInstance(0);
const int dim = 256;
auto gpu_mgr = cache::GpuCacheMgr::GetInstance(0);
for(int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
gpu_mgr->InsertItem("index_" + std::to_string(i), obj);
//each vector is 1k byte
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
auto obj = gpu_mgr->GetItem("index_0");
......@@ -187,19 +185,13 @@ TEST(CacheTest, GPU_CACHE_TEST) {
for (auto i = 0; i < 3; i++) {
// TODO: use gpu index to mock
MockVecIndex *mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000000; //2G
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(index);
//each vector is 1k byte, total size less than 2G
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2000000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
std::cout << data_obj->size() <<std::endl;
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
// ASSERT_EQ(gpu_mgr->ItemCount(), 2);
// auto obj0 = gpu_mgr->GetItem("index_0");
// ASSERT_EQ(obj0, nullptr);
// auto obj1 = gpu_mgr->GetItem("index_1");
// auto obj2 = gpu_mgr->GetItem("index_2");
gpu_mgr->ClearCache();
ASSERT_EQ(gpu_mgr->ItemCount(), 0);
......@@ -213,7 +205,7 @@ TEST(CacheTest, INVALID_TEST) {
ASSERT_EQ(mgr.GetItem("test"), nullptr);
mgr.InsertItem("test", cache::DataObjPtr());
mgr.InsertItem("test", engine::VecIndexPtr(nullptr));
mgr.InsertItem("test", nullptr);
mgr.EraseItem("test");
mgr.PrintInfo();
mgr.ClearCache();
......@@ -225,12 +217,10 @@ TEST(CacheTest, INVALID_TEST) {
{
LessItemCacheMgr mgr;
for(int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 2;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
mgr.InsertItem("index_" + std::to_string(i), obj);
//each vector is 1k byte
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
mgr.InsertItem("index_" + std::to_string(i), data_obj);
}
ASSERT_EQ(mgr.GetItem("index_0"), nullptr);
}
......
......@@ -5,7 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include "utils/easylogging++.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <boost/filesystem.hpp>
......@@ -290,6 +290,7 @@ TEST(UtilTest, ROLLOUTHANDLER_TEST){
std::string dir1 = "/tmp/milvus_test";
std::string dir2 = "/tmp/milvus_test/log_test";
std::string filename[6] = {"log_global.log", "log_debug.log", "log_warning.log", "log_trace.log", "log_error.log", "log_fatal.log"};
el::Level list[6] = {el::Level::Global, el::Level::Debug, el::Level::Warning, el::Level::Trace, el::Level::Error, el::Level::Fatal};
mkdir(dir1.c_str(), S_IRWXU);
mkdir(dir2.c_str(), S_IRWXU);
......@@ -300,7 +301,7 @@ TEST(UtilTest, ROLLOUTHANDLER_TEST){
file.open(tmp.c_str());
file << "zilliz" << std::endl;
server::RolloutHandler(tmp.c_str(), 0);
server::RolloutHandler(tmp.c_str(), 0, list[i]);
tmp.append(".1");
std::ifstream file2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册