提交 c5d0b433 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-455 Distribute tasks by minimal cost in scheduler

See merge request megasearch/milvus!466

Former-commit-id: 98d0fa2b40b61c6220263e89b914623dec94efdb
...@@ -74,6 +74,8 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -74,6 +74,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-442 - Merge Knowhere - MS-442 - Merge Knowhere
- MS-445 - Rename CopyCompleted to LoadCompleted - MS-445 - Rename CopyCompleted to LoadCompleted
- MS-451 - Update server_config.template file, set GPU compute default - MS-451 - Update server_config.template file, set GPU compute default
- MS-455 - Distribute tasks by minimal cost in scheduler
- MS-460 - Put transport speed as weight when choosing neighbour to execute task
- MS-459 - Add cache for pick function in tasktable - MS-459 - Add cache for pick function in tasktable
## New Feature ## New Feature
......
...@@ -54,7 +54,7 @@ set(grpc_service_files ...@@ -54,7 +54,7 @@ set(grpc_service_files
grpc/gen-milvus/milvus.pb.cc grpc/gen-milvus/milvus.pb.cc
grpc/gen-status/status.grpc.pb.cc grpc/gen-status/status.grpc.pb.cc
grpc/gen-status/status.pb.cc grpc/gen-status/status.pb.cc
) scheduler/Utils.h)
set(db_files set(db_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Algorithm.h"
namespace zilliz {
namespace milvus {
namespace engine {
constexpr uint64_t MAXINT = 99999;
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string> &path) {
std::vector<std::vector<std::string>> paths;
uint64_t num_of_resources = res_mgr->GetAllResouces().size();
std::unordered_map<uint64_t, std::string> id_name_map;
std::unordered_map<std::string, uint64_t> name_id_map;
for (uint64_t i = 0; i < num_of_resources; ++i) {
id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name()));
name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i));
}
std::vector<std::vector<uint64_t> > dis_matrix;
dis_matrix.resize(num_of_resources);
for (uint64_t i = 0; i < num_of_resources; ++i) {
dis_matrix[i].resize(num_of_resources);
for (uint64_t j = 0; j < num_of_resources; ++j) {
dis_matrix[i][j] = MAXINT;
}
dis_matrix[i][i] = 0;
}
std::vector<bool> vis(num_of_resources, false);
std::vector<uint64_t> dis(num_of_resources, MAXINT);
for (auto &res : res_mgr->GetAllResouces()) {
auto cur_node = std::static_pointer_cast<Node>(res);
auto cur_neighbours = cur_node->GetNeighbours();
for (auto &neighbour : cur_neighbours) {
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock());
dis_matrix[name_id_map.at(res->Name())][name_id_map.at(neighbour_res->Name())] =
neighbour.connection.transport_cost();
}
}
for (uint64_t i = 0; i < num_of_resources; ++i) {
dis[i] = dis_matrix[name_id_map.at(src->Name())][i];
}
vis[name_id_map.at(src->Name())] = true;
std::vector<int64_t> parent(num_of_resources, -1);
for (uint64_t i = 0; i < num_of_resources; ++i) {
uint64_t minn = MAXINT;
uint64_t temp = 0;
for (uint64_t j = 0; j < num_of_resources; ++j) {
if (!vis[j] && dis[j] < minn) {
minn = dis[j];
temp = j;
}
}
vis[temp] = true;
if (i == 0) {
parent[temp] = name_id_map.at(src->Name());
}
for (uint64_t j = 0; j < num_of_resources; ++j) {
if (!vis[j] && dis_matrix[temp][j] != MAXINT && dis_matrix[temp][j] + dis[temp] < dis[j]) {
dis[j] = dis_matrix[temp][j] + dis[temp];
parent[j] = temp;
}
}
}
int64_t parent_idx = parent[name_id_map.at(dest->Name())];
if (parent_idx != -1) {
path.push_back(dest->Name());
}
while (parent_idx != -1) {
path.push_back(id_name_map.at(parent_idx));
parent_idx = parent[parent_idx];
}
return dis[name_id_map.at(dest->Name())];
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string>& path);
}
}
}
\ No newline at end of file
...@@ -28,6 +28,17 @@ ResourceMgr::GetNumOfComputeResource() { ...@@ -28,6 +28,17 @@ ResourceMgr::GetNumOfComputeResource() {
return count; return count;
} }
std::vector<ResourcePtr>
ResourceMgr::GetComputeResource() {
std::vector<ResourcePtr > result;
for (auto &resource : resources_) {
if (resource->HasExecutor()) {
result.emplace_back(resource);
}
}
return result;
}
uint64_t uint64_t
ResourceMgr::GetNumGpuResource() const { ResourceMgr::GetNumGpuResource() const {
uint64_t num = 0; uint64_t num = 0;
...@@ -49,6 +60,21 @@ ResourceMgr::GetResource(ResourceType type, uint64_t device_id) { ...@@ -49,6 +60,21 @@ ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
return nullptr; return nullptr;
} }
ResourcePtr
ResourceMgr::GetResourceByName(std::string name) {
for (auto &resource : resources_) {
if (resource->Name() == name) {
return resource;
}
}
return nullptr;
}
std::vector<ResourcePtr>
ResourceMgr::GetAllResouces() {
return resources_;
}
ResourceWPtr ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) { ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource); ResourceWPtr ret(resource);
......
...@@ -41,12 +41,21 @@ public: ...@@ -41,12 +41,21 @@ public:
ResourcePtr ResourcePtr
GetResource(ResourceType type, uint64_t device_id); GetResource(ResourceType type, uint64_t device_id);
ResourcePtr
GetResourceByName(std::string name);
std::vector<ResourcePtr>
GetAllResouces();
/* /*
* Return account of resource which enable executor; * Return account of resource which enable executor;
*/ */
uint64_t uint64_t
GetNumOfComputeResource(); GetNumOfComputeResource();
std::vector<ResourcePtr>
GetComputeResource();
/* /*
* Add resource into Resource Management; * Add resource into Resource Management;
* Generate functions on events; * Generate functions on events;
......
...@@ -43,14 +43,21 @@ StartSchedulerService() { ...@@ -43,14 +43,21 @@ StartSchedulerService() {
knowhere::FaissGpuResourceMgr::GetInstance().InitResource(); knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
auto default_connection = Connection("default_connection", 500.0); // auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS); auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
for (auto &conn : connections) { for (auto &conn : connections) {
auto &connect_name = conn.first;
auto &connect_conf = conn.second;
auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
std::string delimiter = "==="; std::string delimiter = "===";
std::string left = conn.substr(0, conn.find(delimiter)); std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
std::string right = conn.substr(conn.find(delimiter) + 3, conn.length()); std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
connect_endpoint.length());
ResMgrInst::GetInstance()->Connect(left, right, default_connection); auto connection = Connection(connect_name, connect_speed);
ResMgrInst::GetInstance()->Connect(left, right, connection);
} }
ResMgrInst::GetInstance()->Start(); ResMgrInst::GetInstance()->Start();
......
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
******************************************************************************/ ******************************************************************************/
#include <src/cache/GpuCacheMgr.h> #include <src/cache/GpuCacheMgr.h>
#include "event/LoadCompletedEvent.h"
#include "Scheduler.h" #include "Scheduler.h"
#include "action/Action.h" #include "action/Action.h"
#include "Algorithm.h"
namespace zilliz { namespace zilliz {
...@@ -136,6 +138,54 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { ...@@ -136,6 +138,54 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
} }
break; break;
} }
case TaskLabelType::SPECIFIED_RESOURCE: {
auto self = event->resource_.lock();
auto task = load_completed_event->task_table_item_->task;
// if this resource is disk, assign it to smallest cost resource
if (self->Type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResource();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t > transport_costs;
for (auto &res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
+ transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
if(self->Name() == task->path().Last()) {
self->WakeupLoader();
} else {
auto next_res_name = task->path().Next();
auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name);
load_completed_event->task_table_item_->Move();
next_res->task_table().Put(task);
}
break;
}
case TaskLabelType::BROADCAST: { case TaskLabelType::BROADCAST: {
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break; break;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <chrono>
#include "Utils.h"
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_current_timestamp()
{
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <cstdint>
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_current_timestamp();
}
}
}
\ No newline at end of file
...@@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) { ...@@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) {
return neighbours; return neighbours;
} }
std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr &self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
// if (not resource->HasExecutor()) continue;
Connection conn = neighbour_node.connection;
neighbours.emplace_back(std::make_pair(resource, conn));
}
return neighbours;
}
void void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task, Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) { const ResourcePtr &self) {
auto neighbours = get_neighbours(self); auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) { if (not neighbours.empty()) {
std::vector<uint64_t > speeds;
uint64_t total_speed = 0;
for (auto &neighbour : neighbours) {
uint64_t speed = neighbour.second.speed();
speeds.emplace_back(speed);
total_speed += speed;
}
std::random_device rd; std::random_device rd;
std::mt19937 mt(rd()); std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1); std::uniform_int_distribution<int> dist(0, total_speed);
uint64_t index = 0;
int64_t rd_speed = dist(mt);
for (uint64_t i = 0; i < speeds.size(); ++i) {
rd_speed -= speeds[i];
if (rd_speed <= 0) {
neighbours[i].first->task_table().Put(task);
return;
}
}
neighbours[dist(mt)]->task_table().Put(task);
} else { } else {
//TODO: process //TODO: process
} }
......
...@@ -19,15 +19,20 @@ public: ...@@ -19,15 +19,20 @@ public:
: name_(std::move(name)), speed_(speed) {} : name_(std::move(name)), speed_(speed) {}
const std::string & const std::string &
get_name() const { name() const {
return name_; return name_;
} }
const double uint64_t
get_speed() const { speed() const {
return speed_; return speed_;
} }
uint64_t
transport_cost() {
return 1024 / speed_;
}
public: public:
std::string std::string
Dump() const { Dump() const {
...@@ -38,7 +43,7 @@ public: ...@@ -38,7 +43,7 @@ public:
private: private:
std::string name_; std::string name_;
double speed_; uint64_t speed_;
}; };
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include <iostream> #include <iostream>
#include "../Utils.h"
#include "Resource.h" #include "Resource.h"
...@@ -138,7 +139,13 @@ void Resource::executor_function() { ...@@ -138,7 +139,13 @@ void Resource::executor_function() {
if (task_item == nullptr) { if (task_item == nullptr) {
break; break;
} }
auto start = get_current_timestamp();
Process(task_item->task); Process(task_item->task);
auto finish = get_current_timestamp();
++total_task_;
total_cost_ += finish - start;
task_item->Executed(); task_item->Executed();
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item); auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
......
...@@ -43,7 +43,7 @@ enum class RegisterType { ...@@ -43,7 +43,7 @@ enum class RegisterType {
}; };
class Resource : public Node, public std::enable_shared_from_this<Resource> { class Resource : public Node, public std::enable_shared_from_this<Resource> {
public: public:
/* /*
* Start loader and executor if enable; * Start loader and executor if enable;
*/ */
...@@ -68,7 +68,7 @@ public: ...@@ -68,7 +68,7 @@ public:
void void
WakeupExecutor(); WakeupExecutor();
public: public:
template<typename T> template<typename T>
void Register_T(const RegisterType &type) { void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); }); register_table_.emplace(type, [] { return std::make_shared<T>(); });
...@@ -109,6 +109,27 @@ public: ...@@ -109,6 +109,27 @@ public:
return enable_executor_; return enable_executor_;
} }
// TODO: const
uint64_t
NumOfTaskToExec() {
uint64_t count = 0;
for (auto &task : task_table_) {
if (task->state == TaskTableItemState::LOADED) ++count;
}
return count;
}
// TODO: need double ?
inline uint64_t
TaskAvgCost() const {
return total_cost_ / total_task_;
}
inline uint64_t
TotalTasks() const {
return total_task_;
}
TaskTable & TaskTable &
task_table(); task_table();
...@@ -119,7 +140,7 @@ public: ...@@ -119,7 +140,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const Resource &resource); friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
protected: protected:
Resource(std::string name, Resource(std::string name,
ResourceType type, ResourceType type,
uint64_t device_id, uint64_t device_id,
...@@ -141,7 +162,7 @@ protected: ...@@ -141,7 +162,7 @@ protected:
virtual void virtual void
Process(TaskPtr task) = 0; Process(TaskPtr task) = 0;
private: private:
/* /*
* These function should move to cost.h ??? * These function should move to cost.h ???
* COST.H ??? * COST.H ???
...@@ -161,7 +182,7 @@ private: ...@@ -161,7 +182,7 @@ private:
TaskTableItemPtr TaskTableItemPtr
pick_task_execute(); pick_task_execute();
private: private:
/* /*
* Only called by load thread; * Only called by load thread;
*/ */
...@@ -174,14 +195,17 @@ private: ...@@ -174,14 +195,17 @@ private:
void void
executor_function(); executor_function();
protected: protected:
uint64_t device_id_; uint64_t device_id_;
std::string name_; std::string name_;
private: private:
ResourceType type_; ResourceType type_;
TaskTable task_table_; TaskTable task_table_;
uint64_t total_cost_ = 0;
uint64_t total_task_ = 0;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_; std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(EventPtr)> subscriber_ = nullptr; std::function<void(EventPtr)> subscriber_ = nullptr;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
class Path {
public:
Path() = default;
Path(std::vector<std::string>& path, uint64_t index) : path_(path), index_(index) {}
void
push_back(const std::string &str) {
path_.push_back(str);
}
std::vector<std::string>
Dump() {
return path_;
}
std::string
Next() {
if (index_ > 0 && !path_.empty()) {
--index_;
return path_[index_];
} else {
return nullptr;
}
}
std::string
Last() {
if (!path_.empty()) {
return path_[0];
} else {
return nullptr;
}
}
public:
std::string &
operator[](uint64_t index) {
return path_[index];
}
std::vector<std::string>::iterator begin() { return path_.begin(); }
std::vector<std::string>::iterator end() { return path_.end(); }
public:
std::vector<std::string> path_;
uint64_t index_ = 0;
};
}
}
}
\ No newline at end of file
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "db/scheduler/context/SearchContext.h" #include "db/scheduler/context/SearchContext.h"
#include "db/scheduler/task/IScheduleTask.h" #include "db/scheduler/task/IScheduleTask.h"
#include "scheduler/tasklabel/TaskLabel.h" #include "scheduler/tasklabel/TaskLabel.h"
#include "Path.h"
#include <string> #include <string>
#include <memory> #include <memory>
...@@ -44,6 +45,14 @@ public: ...@@ -44,6 +45,14 @@ public:
inline TaskType inline TaskType
Type() const { return type_; } Type() const { return type_; }
/*
* Transport path;
*/
inline Path&
path() {
return task_path_;
}
/* /*
* Getter and Setter; * Getter and Setter;
*/ */
...@@ -64,6 +73,7 @@ public: ...@@ -64,6 +73,7 @@ public:
Clone() = 0; Clone() = 0;
public: public:
Path task_path_;
std::vector<SearchContextPtr> search_contexts_; std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_; ScheduleTaskPtr task_;
TaskType type_; TaskType type_;
......
...@@ -22,24 +22,24 @@ namespace engine { ...@@ -22,24 +22,24 @@ namespace engine {
class SpecResLabel : public TaskLabel { class SpecResLabel : public TaskLabel {
public: public:
SpecResLabel(const ResourceWPtr &resource) SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {} : TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {}
inline ResourceWPtr & inline ResourceWPtr &
resource() const { resource() {
return resource_; return resource_;
} }
inline std::string & inline std::string &
resource_name() const { resource_name() {
return resource_name_; return resource_name_;
} }
private: private:
ResourceWPtr resource_; ResourceWPtr resource_;
std::string resource_name_; std::string resource_name_;
} };
using SpecResLabelPtr = std::make_shared<SpecResLabel>; using SpecResLabelPtr = std::shared_ptr<SpecResLabel>();
} }
} }
......
...@@ -13,7 +13,7 @@ namespace engine { ...@@ -13,7 +13,7 @@ namespace engine {
enum class TaskLabelType { enum class TaskLabelType {
DEFAULT, // means can be executed in any resource DEFAULT, // means can be executed in any resource
SPECIAL_RESOURCE, // means must executing in special resource SPECIFIED_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task BROADCAST, // means all enable-executor resource must execute task
}; };
......
...@@ -18,175 +18,173 @@ using namespace milvus; ...@@ -18,175 +18,173 @@ using namespace milvus;
//#define SET_VECTOR_IDS; //#define SET_VECTOR_IDS;
namespace { namespace {
std::string GetTableName(); std::string GetTableName();
const std::string TABLE_NAME = GetTableName(); const std::string TABLE_NAME = GetTableName();
constexpr int64_t TABLE_DIMENSION = 512; constexpr int64_t TABLE_DIMENSION = 512;
constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; constexpr int64_t TABLE_INDEX_FILE_SIZE = 768;
constexpr int64_t BATCH_ROW_COUNT = 1000000; constexpr int64_t BATCH_ROW_COUNT = 100000;
constexpr int64_t NQ = 100; constexpr int64_t NQ = 100;
constexpr int64_t TOP_K = 10; constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1; constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t SECONDS_EACH_HOUR = 3600; constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
void PrintTableSchema(const TableSchema& tb_schema) { void PrintTableSchema(const TableSchema& tb_schema) {
BLOCK_SPLITER BLOCK_SPLITER
std::cout << "Table name: " << tb_schema.table_name << std::endl; std::cout << "Table name: " << tb_schema.table_name << std::endl;
std::cout << "Table dimension: " << tb_schema.dimension << std::endl; std::cout << "Table dimension: " << tb_schema.dimension << std::endl;
BLOCK_SPLITER BLOCK_SPLITER
} }
void PrintSearchResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array, void PrintSearchResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) { const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER BLOCK_SPLITER
std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl;
int32_t index = 0; int32_t index = 0;
for(auto& result : topk_query_result_array) { for(auto& result : topk_query_result_array) {
auto search_id = search_record_array[index].first; auto search_id = search_record_array[index].first;
index++; index++;
std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id)
<< " top " << std::to_string(result.query_result_arrays.size()) << " top " << std::to_string(result.query_result_arrays.size())
<< " search result:" << std::endl; << " search result:" << std::endl;
for(auto& item : result.query_result_arrays) { for(auto& item : result.query_result_arrays) {
std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance);
std::cout << std::endl; std::cout << std::endl;
}
} }
BLOCK_SPLITER
} }
std::string CurrentTime() { BLOCK_SPLITER
time_t tt; }
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) std::string CurrentTime() {
+ "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) time_t tt;
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tm* t= gmtime( &tt );
return str; std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
} + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour)
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec);
std::string CurrentTmDate(int64_t offset_day = 0) { return str;
time_t tt; }
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) std::string CurrentTmDate(int64_t offset_day = 0) {
+ "-" + std::to_string(t->tm_mday); time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
return str; std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
} + "-" + std::to_string(t->tm_mday);
std::string GetTableName() { return str;
static std::string s_id(CurrentTime()); }
return "tbl_" + s_id;
}
TableSchema BuildTableSchema() { std::string GetTableName() {
TableSchema tb_schema; static std::string s_id(CurrentTime());
tb_schema.table_name = TABLE_NAME; return "tbl_" + s_id;
tb_schema.dimension = TABLE_DIMENSION; }
tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE;
return tb_schema; TableSchema BuildTableSchema() {
} TableSchema tb_schema;
tb_schema.table_name = TABLE_NAME;
tb_schema.dimension = TABLE_DIMENSION;
tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE;
void BuildVectors(int64_t from, int64_t to, return tb_schema;
std::vector<RowRecord>& vector_record_array) { }
if(to <= from){
return;
}
vector_record_array.clear(); void BuildVectors(int64_t from, int64_t to,
for (int64_t k = from; k < to; k++) { std::vector<RowRecord>& vector_record_array) {
RowRecord record; if(to <= from){
record.data.resize(TABLE_DIMENSION); return;
for(int64_t i = 0; i < TABLE_DIMENSION; i++) { }
record.data[i] = (float)(k%(i+1));
}
vector_record_array.emplace_back(record); vector_record_array.clear();
for (int64_t k = from; k < to; k++) {
RowRecord record;
record.data.resize(TABLE_DIMENSION);
for(int64_t i = 0; i < TABLE_DIMENSION; i++) {
record.data[i] = (float)(k%(i+1));
} }
}
void Sleep(int seconds) { vector_record_array.emplace_back(record);
std::cout << "Waiting " << seconds << " seconds ..." << std::endl;
sleep(seconds);
} }
}
class TimeRecorder { void Sleep(int seconds) {
public: std::cout << "Waiting " << seconds << " seconds ..." << std::endl;
explicit TimeRecorder(const std::string& title) sleep(seconds);
: title_(title) { }
start_ = std::chrono::system_clock::now();
}
~TimeRecorder() {
std::chrono::system_clock::time_point end = std::chrono::system_clock::now();
long span = (std::chrono::duration_cast<std::chrono::milliseconds> (end - start_)).count();
std::cout << title_ << " totally cost: " << span << " ms" << std::endl;
}
private: class TimeRecorder {
std::string title_; public:
std::chrono::system_clock::time_point start_; explicit TimeRecorder(const std::string& title)
}; : title_(title) {
start_ = std::chrono::system_clock::now();
void CheckResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
int64_t index = 0;
for(auto& result : topk_query_result_array) {
auto result_id = result.query_result_arrays[0].id;
auto search_id = search_record_array[index++].first;
if(result_id != search_id) {
std::cout << "The top 1 result is wrong: " << result_id
<< " vs. " << search_id << std::endl;
} else {
std::cout << "Check result sucessfully" << std::endl;
}
}
BLOCK_SPLITER
} }
void DoSearch(std::shared_ptr<Connection> conn, ~TimeRecorder() {
const std::vector<std::pair<int64_t, RowRecord>>& search_record_array, std::chrono::system_clock::time_point end = std::chrono::system_clock::now();
const std::string& phase_name) { long span = (std::chrono::duration_cast<std::chrono::milliseconds> (end - start_)).count();
std::vector<Range> query_range_array; std::cout << title_ << " totally cost: " << span << " ms" << std::endl;
Range rg; }
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate(1);
query_range_array.emplace_back(rg);
std::vector<RowRecord> record_array; private:
for(auto& pair : search_record_array) { std::string title_;
record_array.push_back(pair.second); std::chrono::system_clock::time_point start_;
};
void CheckResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
int64_t index = 0;
for(auto& result : topk_query_result_array) {
auto result_id = result.query_result_arrays[0].id;
auto search_id = search_record_array[index++].first;
if(result_id != search_id) {
std::cout << "The top 1 result is wrong: " << result_id
<< " vs. " << search_id << std::endl;
} else {
std::cout << "Check result sucessfully" << std::endl;
} }
}
BLOCK_SPLITER
}
auto start = std::chrono::high_resolution_clock::now(); void DoSearch(std::shared_ptr<Connection> conn,
for (auto i = 0; i < 5; ++i) { const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
std::vector<TopKQueryResult> topk_query_result_array; const std::string& phase_name) {
{ std::vector<Range> query_range_array;
TimeRecorder rc(phase_name); Range rg;
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array); rg.start_value = CurrentTmDate();
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; rg.end_value = CurrentTmDate(1);
} query_range_array.emplace_back(rg);
}
auto finish = std::chrono::high_resolution_clock::now(); std::vector<RowRecord> record_array;
std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n"; for(auto& pair : search_record_array) {
record_array.push_back(pair.second);
}
// PrintSearchResult(search_record_array, topk_query_result_array); auto start = std::chrono::high_resolution_clock::now();
// CheckResult(search_record_array, topk_query_result_array); std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
} }
auto finish = std::chrono::high_resolution_clock::now();
std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
PrintSearchResult(search_record_array, topk_query_result_array);
CheckResult(search_record_array, topk_query_result_array);
}
} }
void void
...@@ -216,9 +214,9 @@ ClientTest::Test(const std::string& address, const std::string& port) { ...@@ -216,9 +214,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "All tables: " << std::endl; std::cout << "All tables: " << std::endl;
for(auto& table : tables) { for(auto& table : tables) {
int64_t row_count = 0; int64_t row_count = 0;
// conn->DropTable(table); conn->DropTable(table);
stat = conn->CountTable(table, row_count); // stat = conn->CountTable(table, row_count);
std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; // std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
} }
} }
...@@ -273,7 +271,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { ...@@ -273,7 +271,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
if(search_record_array.size() < NQ) { if(search_record_array.size() < NQ) {
search_record_array.push_back( search_record_array.push_back(
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
} }
} }
} }
...@@ -345,4 +343,4 @@ ClientTest::Test(const std::string& address, const std::string& port) { ...@@ -345,4 +343,4 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::string status = conn->ServerStatus(); std::string status = conn->ServerStatus();
std::cout << "Server status after disconnect: " << status << std::endl; std::cout << "Server status after disconnect: " << status << std::endl;
} }
} }
\ No newline at end of file
...@@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id"; ...@@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id";
static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader"; static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader";
static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor"; static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor";
static const char* CONFIG_RESOURCE_CONNECTIONS = "connections"; static const char* CONFIG_RESOURCE_CONNECTIONS = "connections";
static const char* CONFIG_SPEED_CONNECTIONS = "speed";
static const char* CONFIG_ENDPOINT_CONNECTIONS = "connections";
class ServerConfig { class ServerConfig {
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
#include "scheduler/resource/Resource.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/Algorithm.h"
namespace zilliz {
namespace milvus {
namespace engine {
class AlgorithmTest : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, true);
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1);
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2);
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1);
res_mgr_ = std::make_shared<ResourceMgr>();
disk_ = res_mgr_->Add(std::move(disk));
cpu_0_ = res_mgr_->Add(std::move(cpu0));
cpu_1_ = res_mgr_->Add(std::move(cpu1));
cpu_2_ = res_mgr_->Add(std::move(cpu2));
gpu_0_ = res_mgr_->Add(std::move(gpu0));
gpu_1_ = res_mgr_->Add(std::move(gpu1));
auto IO = Connection("IO", 5.0);
auto PCIE = Connection("PCIE", 11.0);
res_mgr_->Connect("disk", "cpu0", IO);
res_mgr_->Connect("cpu0", "cpu1", IO);
res_mgr_->Connect("cpu1", "cpu2", IO);
res_mgr_->Connect("cpu0", "cpu2", IO);
res_mgr_->Connect("cpu1", "gpu0", PCIE);
res_mgr_->Connect("cpu2", "gpu1", PCIE);
}
ResourceWPtr disk_;
ResourceWPtr cpu_0_;
ResourceWPtr cpu_1_;
ResourceWPtr cpu_2_;
ResourceWPtr gpu_0_;
ResourceWPtr gpu_1_;
ResourceMgrPtr res_mgr_;
};
TEST_F(AlgorithmTest, ShortestPath_test) {
std::vector<std::string> sp;
uint64_t cost;
cost = ShortestPath(disk_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_0_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(disk_.lock(), disk_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_0_.lock(), disk_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_2_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
}
}
}
}
\ No newline at end of file
...@@ -30,7 +30,7 @@ protected: ...@@ -30,7 +30,7 @@ protected:
resources_.push_back(gpu_resource_); resources_.push_back(gpu_resource_);
auto subscriber = [&](EventPtr event) { auto subscriber = [&](EventPtr event) {
if (event->Type() == EventType::COPY_COMPLETED) { if (event->Type() == EventType::LOAD_COMPLETED) {
std::lock_guard<std::mutex> lock(load_mutex_); std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_; ++load_count_;
cv_.notify_one(); cv_.notify_one();
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "scheduler/resource/Resource.h" #include "scheduler/resource/Resource.h"
#include "utils/Error.h" #include "utils/Error.h"
#include "wrapper/knowhere/vec_index.h" #include "wrapper/knowhere/vec_index.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -122,9 +123,6 @@ protected: ...@@ -122,9 +123,6 @@ protected:
ResourceMgrPtr res_mgr_; ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_; std::shared_ptr<Scheduler> scheduler_;
uint64_t load_count_ = 0;
std::mutex load_mutex_;
std::condition_variable cv_;
}; };
void void
...@@ -157,6 +155,74 @@ TEST_F(SchedulerTest, OnCopyCompleted) { ...@@ -157,6 +155,74 @@ TEST_F(SchedulerTest, OnCopyCompleted) {
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
} }
class SchedulerTest2 : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
res_mgr_ = std::make_shared<ResourceMgr>();
disk_ = res_mgr_->Add(std::move(disk));
cpu_0_ = res_mgr_->Add(std::move(cpu0));
cpu_1_ = res_mgr_->Add(std::move(cpu1));
cpu_2_ = res_mgr_->Add(std::move(cpu2));
gpu_0_ = res_mgr_->Add(std::move(gpu0));
gpu_1_ = res_mgr_->Add(std::move(gpu1));
auto IO = Connection("IO", 5.0);
auto PCIE1 = Connection("PCIE", 11.0);
auto PCIE2 = Connection("PCIE", 20.0);
res_mgr_->Connect("disk", "cpu0", IO);
res_mgr_->Connect("cpu0", "cpu1", IO);
res_mgr_->Connect("cpu1", "cpu2", IO);
res_mgr_->Connect("cpu0", "cpu2", IO);
res_mgr_->Connect("cpu1", "gpu0", PCIE1);
res_mgr_->Connect("cpu2", "gpu1", PCIE2);
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
res_mgr_->Start();
scheduler_->Start();
}
void
TearDown() override {
scheduler_->Stop();
res_mgr_->Stop();
}
ResourceWPtr disk_;
ResourceWPtr cpu_0_;
ResourceWPtr cpu_1_;
ResourceWPtr cpu_2_;
ResourceWPtr gpu_0_;
ResourceWPtr gpu_1_;
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
};
TEST_F(SchedulerTest2, SpecifiedResourceTest) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
for (uint64_t i = 0; i < NUM; ++i) {
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<SpecResLabel>(disk_);
tasks.push_back(task);
disk_.lock()->task_table().Put(task);
}
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册