提交 7af80a71 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-455 Distribute tasks by minimal cost in scheduler

See merge request megasearch/milvus!466

Former-commit-id: ee4152eeed45d3b3812faa8b0915998ff726f0ec
......@@ -74,6 +74,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-442 - Merge Knowhere
- MS-445 - Rename CopyCompleted to LoadCompleted
- MS-451 - Update server_config.template file, set GPU compute default
- MS-455 - Distribute tasks by minimal cost in scheduler
- MS-460 - Put transport speed as weight when choosing neighbour to execute task
- MS-459 - Add cache for pick function in tasktable
## New Feature
......
......@@ -54,7 +54,7 @@ set(grpc_service_files
grpc/gen-milvus/milvus.pb.cc
grpc/gen-status/status.grpc.pb.cc
grpc/gen-status/status.pb.cc
)
scheduler/Utils.h)
set(db_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Algorithm.h"
namespace zilliz {
namespace milvus {
namespace engine {
constexpr uint64_t MAXINT = 99999;
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string> &path) {
std::vector<std::vector<std::string>> paths;
uint64_t num_of_resources = res_mgr->GetAllResouces().size();
std::unordered_map<uint64_t, std::string> id_name_map;
std::unordered_map<std::string, uint64_t> name_id_map;
for (uint64_t i = 0; i < num_of_resources; ++i) {
id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name()));
name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i));
}
std::vector<std::vector<uint64_t> > dis_matrix;
dis_matrix.resize(num_of_resources);
for (uint64_t i = 0; i < num_of_resources; ++i) {
dis_matrix[i].resize(num_of_resources);
for (uint64_t j = 0; j < num_of_resources; ++j) {
dis_matrix[i][j] = MAXINT;
}
dis_matrix[i][i] = 0;
}
std::vector<bool> vis(num_of_resources, false);
std::vector<uint64_t> dis(num_of_resources, MAXINT);
for (auto &res : res_mgr->GetAllResouces()) {
auto cur_node = std::static_pointer_cast<Node>(res);
auto cur_neighbours = cur_node->GetNeighbours();
for (auto &neighbour : cur_neighbours) {
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock());
dis_matrix[name_id_map.at(res->Name())][name_id_map.at(neighbour_res->Name())] =
neighbour.connection.transport_cost();
}
}
for (uint64_t i = 0; i < num_of_resources; ++i) {
dis[i] = dis_matrix[name_id_map.at(src->Name())][i];
}
vis[name_id_map.at(src->Name())] = true;
std::vector<int64_t> parent(num_of_resources, -1);
for (uint64_t i = 0; i < num_of_resources; ++i) {
uint64_t minn = MAXINT;
uint64_t temp = 0;
for (uint64_t j = 0; j < num_of_resources; ++j) {
if (!vis[j] && dis[j] < minn) {
minn = dis[j];
temp = j;
}
}
vis[temp] = true;
if (i == 0) {
parent[temp] = name_id_map.at(src->Name());
}
for (uint64_t j = 0; j < num_of_resources; ++j) {
if (!vis[j] && dis_matrix[temp][j] != MAXINT && dis_matrix[temp][j] + dis[temp] < dis[j]) {
dis[j] = dis_matrix[temp][j] + dis[temp];
parent[j] = temp;
}
}
}
int64_t parent_idx = parent[name_id_map.at(dest->Name())];
if (parent_idx != -1) {
path.push_back(dest->Name());
}
while (parent_idx != -1) {
path.push_back(id_name_map.at(parent_idx));
parent_idx = parent[parent_idx];
}
return dis[name_id_map.at(dest->Name())];
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string>& path);
}
}
}
\ No newline at end of file
......@@ -28,6 +28,17 @@ ResourceMgr::GetNumOfComputeResource() {
return count;
}
std::vector<ResourcePtr>
ResourceMgr::GetComputeResource() {
std::vector<ResourcePtr > result;
for (auto &resource : resources_) {
if (resource->HasExecutor()) {
result.emplace_back(resource);
}
}
return result;
}
uint64_t
ResourceMgr::GetNumGpuResource() const {
uint64_t num = 0;
......@@ -49,6 +60,21 @@ ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
return nullptr;
}
ResourcePtr
ResourceMgr::GetResourceByName(std::string name) {
for (auto &resource : resources_) {
if (resource->Name() == name) {
return resource;
}
}
return nullptr;
}
std::vector<ResourcePtr>
ResourceMgr::GetAllResouces() {
return resources_;
}
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
......
......@@ -41,12 +41,21 @@ public:
ResourcePtr
GetResource(ResourceType type, uint64_t device_id);
ResourcePtr
GetResourceByName(std::string name);
std::vector<ResourcePtr>
GetAllResouces();
/*
* Return account of resource which enable executor;
*/
uint64_t
GetNumOfComputeResource();
std::vector<ResourcePtr>
GetComputeResource();
/*
* Add resource into Resource Management;
* Generate functions on events;
......
......@@ -43,14 +43,21 @@ StartSchedulerService() {
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS);
// auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
for (auto &conn : connections) {
auto &connect_name = conn.first;
auto &connect_conf = conn.second;
auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
std::string delimiter = "===";
std::string left = conn.substr(0, conn.find(delimiter));
std::string right = conn.substr(conn.find(delimiter) + 3, conn.length());
std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
connect_endpoint.length());
ResMgrInst::GetInstance()->Connect(left, right, default_connection);
auto connection = Connection(connect_name, connect_speed);
ResMgrInst::GetInstance()->Connect(left, right, connection);
}
ResMgrInst::GetInstance()->Start();
......
......@@ -5,8 +5,10 @@
******************************************************************************/
#include <src/cache/GpuCacheMgr.h>
#include "event/LoadCompletedEvent.h"
#include "Scheduler.h"
#include "action/Action.h"
#include "Algorithm.h"
namespace zilliz {
......@@ -136,6 +138,54 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
}
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
auto self = event->resource_.lock();
auto task = load_completed_event->task_table_item_->task;
// if this resource is disk, assign it to smallest cost resource
if (self->Type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResource();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t > transport_costs;
for (auto &res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
+ transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
if(self->Name() == task->path().Last()) {
self->WakeupLoader();
} else {
auto next_res_name = task->path().Next();
auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name);
load_completed_event->task_table_item_->Move();
next_res->task_table().Put(task);
}
break;
}
case TaskLabelType::BROADCAST: {
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <chrono>
#include "Utils.h"
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_current_timestamp()
{
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <cstdint>
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_current_timestamp();
}
}
}
\ No newline at end of file
......@@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) {
return neighbours;
}
std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr &self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
// if (not resource->HasExecutor()) continue;
Connection conn = neighbour_node.connection;
neighbours.emplace_back(std::make_pair(resource, conn));
}
return neighbours;
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t > speeds;
uint64_t total_speed = 0;
for (auto &neighbour : neighbours) {
uint64_t speed = neighbour.second.speed();
speeds.emplace_back(speed);
total_speed += speed;
}
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
std::uniform_int_distribution<int> dist(0, total_speed);
uint64_t index = 0;
int64_t rd_speed = dist(mt);
for (uint64_t i = 0; i < speeds.size(); ++i) {
rd_speed -= speeds[i];
if (rd_speed <= 0) {
neighbours[i].first->task_table().Put(task);
return;
}
}
neighbours[dist(mt)]->task_table().Put(task);
} else {
//TODO: process
}
......
......@@ -19,15 +19,20 @@ public:
: name_(std::move(name)), speed_(speed) {}
const std::string &
get_name() const {
name() const {
return name_;
}
const double
get_speed() const {
uint64_t
speed() const {
return speed_;
}
uint64_t
transport_cost() {
return 1024 / speed_;
}
public:
std::string
Dump() const {
......@@ -38,7 +43,7 @@ public:
private:
std::string name_;
double speed_;
uint64_t speed_;
};
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include "../Utils.h"
#include "Resource.h"
......@@ -138,7 +139,13 @@ void Resource::executor_function() {
if (task_item == nullptr) {
break;
}
auto start = get_current_timestamp();
Process(task_item->task);
auto finish = get_current_timestamp();
++total_task_;
total_cost_ += finish - start;
task_item->Executed();
if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
......
......@@ -43,7 +43,7 @@ enum class RegisterType {
};
class Resource : public Node, public std::enable_shared_from_this<Resource> {
public:
public:
/*
* Start loader and executor if enable;
*/
......@@ -68,7 +68,7 @@ public:
void
WakeupExecutor();
public:
public:
template<typename T>
void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
......@@ -109,6 +109,27 @@ public:
return enable_executor_;
}
// TODO: const
uint64_t
NumOfTaskToExec() {
uint64_t count = 0;
for (auto &task : task_table_) {
if (task->state == TaskTableItemState::LOADED) ++count;
}
return count;
}
// TODO: need double ?
inline uint64_t
TaskAvgCost() const {
return total_cost_ / total_task_;
}
inline uint64_t
TotalTasks() const {
return total_task_;
}
TaskTable &
task_table();
......@@ -119,7 +140,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
protected:
protected:
Resource(std::string name,
ResourceType type,
uint64_t device_id,
......@@ -141,7 +162,7 @@ protected:
virtual void
Process(TaskPtr task) = 0;
private:
private:
/*
* These function should move to cost.h ???
* COST.H ???
......@@ -161,7 +182,7 @@ private:
TaskTableItemPtr
pick_task_execute();
private:
private:
/*
* Only called by load thread;
*/
......@@ -174,14 +195,17 @@ private:
void
executor_function();
protected:
protected:
uint64_t device_id_;
std::string name_;
private:
private:
ResourceType type_;
TaskTable task_table_;
uint64_t total_cost_ = 0;
uint64_t total_task_ = 0;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(EventPtr)> subscriber_ = nullptr;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
class Path {
public:
Path() = default;
Path(std::vector<std::string>& path, uint64_t index) : path_(path), index_(index) {}
void
push_back(const std::string &str) {
path_.push_back(str);
}
std::vector<std::string>
Dump() {
return path_;
}
std::string
Next() {
if (index_ > 0 && !path_.empty()) {
--index_;
return path_[index_];
} else {
return nullptr;
}
}
std::string
Last() {
if (!path_.empty()) {
return path_[0];
} else {
return nullptr;
}
}
public:
std::string &
operator[](uint64_t index) {
return path_[index];
}
std::vector<std::string>::iterator begin() { return path_.begin(); }
std::vector<std::string>::iterator end() { return path_.end(); }
public:
std::vector<std::string> path_;
uint64_t index_ = 0;
};
}
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@
#include "db/scheduler/context/SearchContext.h"
#include "db/scheduler/task/IScheduleTask.h"
#include "scheduler/tasklabel/TaskLabel.h"
#include "Path.h"
#include <string>
#include <memory>
......@@ -44,6 +45,14 @@ public:
inline TaskType
Type() const { return type_; }
/*
* Transport path;
*/
inline Path&
path() {
return task_path_;
}
/*
* Getter and Setter;
*/
......@@ -64,6 +73,7 @@ public:
Clone() = 0;
public:
Path task_path_;
std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_;
TaskType type_;
......
......@@ -22,24 +22,24 @@ namespace engine {
class SpecResLabel : public TaskLabel {
public:
SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {}
: TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {}
inline ResourceWPtr &
resource() const {
resource() {
return resource_;
}
inline std::string &
resource_name() const {
resource_name() {
return resource_name_;
}
private:
ResourceWPtr resource_;
std::string resource_name_;
}
};
using SpecResLabelPtr = std::make_shared<SpecResLabel>;
using SpecResLabelPtr = std::shared_ptr<SpecResLabel>();
}
}
......
......@@ -13,7 +13,7 @@ namespace engine {
enum class TaskLabelType {
DEFAULT, // means can be executed in any resource
SPECIAL_RESOURCE, // means must executing in special resource
SPECIFIED_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task
};
......
......@@ -18,175 +18,173 @@ using namespace milvus;
//#define SET_VECTOR_IDS;
namespace {
std::string GetTableName();
const std::string TABLE_NAME = GetTableName();
constexpr int64_t TABLE_DIMENSION = 512;
constexpr int64_t TABLE_INDEX_FILE_SIZE = 768;
constexpr int64_t BATCH_ROW_COUNT = 1000000;
constexpr int64_t NQ = 100;
constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
std::string GetTableName();
const std::string TABLE_NAME = GetTableName();
constexpr int64_t TABLE_DIMENSION = 512;
constexpr int64_t TABLE_INDEX_FILE_SIZE = 768;
constexpr int64_t BATCH_ROW_COUNT = 100000;
constexpr int64_t NQ = 100;
constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
void PrintTableSchema(const TableSchema& tb_schema) {
BLOCK_SPLITER
std::cout << "Table name: " << tb_schema.table_name << std::endl;
std::cout << "Table dimension: " << tb_schema.dimension << std::endl;
BLOCK_SPLITER
}
void PrintTableSchema(const TableSchema& tb_schema) {
BLOCK_SPLITER
std::cout << "Table name: " << tb_schema.table_name << std::endl;
std::cout << "Table dimension: " << tb_schema.dimension << std::endl;
BLOCK_SPLITER
}
void PrintSearchResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl;
int32_t index = 0;
for(auto& result : topk_query_result_array) {
auto search_id = search_record_array[index].first;
index++;
std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id)
<< " top " << std::to_string(result.query_result_arrays.size())
<< " search result:" << std::endl;
for(auto& item : result.query_result_arrays) {
std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance);
std::cout << std::endl;
}
void PrintSearchResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl;
int32_t index = 0;
for(auto& result : topk_query_result_array) {
auto search_id = search_record_array[index].first;
index++;
std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id)
<< " top " << std::to_string(result.query_result_arrays.size())
<< " search result:" << std::endl;
for(auto& item : result.query_result_arrays) {
std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance);
std::cout << std::endl;
}
BLOCK_SPLITER
}
std::string CurrentTime() {
time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tm* t= gmtime( &tt );
BLOCK_SPLITER
}
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
+ "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour)
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec);
std::string CurrentTime() {
time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tm* t= gmtime( &tt );
return str;
}
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
+ "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour)
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec);
std::string CurrentTmDate(int64_t offset_day = 0) {
time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
return str;
}
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
+ "-" + std::to_string(t->tm_mday);
std::string CurrentTmDate(int64_t offset_day = 0) {
time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
return str;
}
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
+ "-" + std::to_string(t->tm_mday);
std::string GetTableName() {
static std::string s_id(CurrentTime());
return "tbl_" + s_id;
}
return str;
}
TableSchema BuildTableSchema() {
TableSchema tb_schema;
tb_schema.table_name = TABLE_NAME;
tb_schema.dimension = TABLE_DIMENSION;
tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE;
std::string GetTableName() {
static std::string s_id(CurrentTime());
return "tbl_" + s_id;
}
return tb_schema;
}
TableSchema BuildTableSchema() {
TableSchema tb_schema;
tb_schema.table_name = TABLE_NAME;
tb_schema.dimension = TABLE_DIMENSION;
tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE;
void BuildVectors(int64_t from, int64_t to,
std::vector<RowRecord>& vector_record_array) {
if(to <= from){
return;
}
return tb_schema;
}
vector_record_array.clear();
for (int64_t k = from; k < to; k++) {
RowRecord record;
record.data.resize(TABLE_DIMENSION);
for(int64_t i = 0; i < TABLE_DIMENSION; i++) {
record.data[i] = (float)(k%(i+1));
}
void BuildVectors(int64_t from, int64_t to,
std::vector<RowRecord>& vector_record_array) {
if(to <= from){
return;
}
vector_record_array.emplace_back(record);
vector_record_array.clear();
for (int64_t k = from; k < to; k++) {
RowRecord record;
record.data.resize(TABLE_DIMENSION);
for(int64_t i = 0; i < TABLE_DIMENSION; i++) {
record.data[i] = (float)(k%(i+1));
}
}
void Sleep(int seconds) {
std::cout << "Waiting " << seconds << " seconds ..." << std::endl;
sleep(seconds);
vector_record_array.emplace_back(record);
}
}
class TimeRecorder {
public:
explicit TimeRecorder(const std::string& title)
: title_(title) {
start_ = std::chrono::system_clock::now();
}
~TimeRecorder() {
std::chrono::system_clock::time_point end = std::chrono::system_clock::now();
long span = (std::chrono::duration_cast<std::chrono::milliseconds> (end - start_)).count();
std::cout << title_ << " totally cost: " << span << " ms" << std::endl;
}
void Sleep(int seconds) {
std::cout << "Waiting " << seconds << " seconds ..." << std::endl;
sleep(seconds);
}
private:
std::string title_;
std::chrono::system_clock::time_point start_;
};
void CheckResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
int64_t index = 0;
for(auto& result : topk_query_result_array) {
auto result_id = result.query_result_arrays[0].id;
auto search_id = search_record_array[index++].first;
if(result_id != search_id) {
std::cout << "The top 1 result is wrong: " << result_id
<< " vs. " << search_id << std::endl;
} else {
std::cout << "Check result sucessfully" << std::endl;
}
}
BLOCK_SPLITER
class TimeRecorder {
public:
explicit TimeRecorder(const std::string& title)
: title_(title) {
start_ = std::chrono::system_clock::now();
}
void DoSearch(std::shared_ptr<Connection> conn,
const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::string& phase_name) {
std::vector<Range> query_range_array;
Range rg;
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate(1);
query_range_array.emplace_back(rg);
~TimeRecorder() {
std::chrono::system_clock::time_point end = std::chrono::system_clock::now();
long span = (std::chrono::duration_cast<std::chrono::milliseconds> (end - start_)).count();
std::cout << title_ << " totally cost: " << span << " ms" << std::endl;
}
std::vector<RowRecord> record_array;
for(auto& pair : search_record_array) {
record_array.push_back(pair.second);
private:
std::string title_;
std::chrono::system_clock::time_point start_;
};
void CheckResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
int64_t index = 0;
for(auto& result : topk_query_result_array) {
auto result_id = result.query_result_arrays[0].id;
auto search_id = search_record_array[index++].first;
if(result_id != search_id) {
std::cout << "The top 1 result is wrong: " << result_id
<< " vs. " << search_id << std::endl;
} else {
std::cout << "Check result sucessfully" << std::endl;
}
}
BLOCK_SPLITER
}
auto start = std::chrono::high_resolution_clock::now();
for (auto i = 0; i < 5; ++i) {
std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
}
}
auto finish = std::chrono::high_resolution_clock::now();
std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
void DoSearch(std::shared_ptr<Connection> conn,
const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::string& phase_name) {
std::vector<Range> query_range_array;
Range rg;
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate(1);
query_range_array.emplace_back(rg);
std::vector<RowRecord> record_array;
for(auto& pair : search_record_array) {
record_array.push_back(pair.second);
}
// PrintSearchResult(search_record_array, topk_query_result_array);
// CheckResult(search_record_array, topk_query_result_array);
auto start = std::chrono::high_resolution_clock::now();
std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
}
auto finish = std::chrono::high_resolution_clock::now();
std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
PrintSearchResult(search_record_array, topk_query_result_array);
CheckResult(search_record_array, topk_query_result_array);
}
}
void
......@@ -216,9 +214,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "All tables: " << std::endl;
for(auto& table : tables) {
int64_t row_count = 0;
// conn->DropTable(table);
stat = conn->CountTable(table, row_count);
std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
conn->DropTable(table);
// stat = conn->CountTable(table, row_count);
// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
}
}
......@@ -273,7 +271,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
if(search_record_array.size() < NQ) {
search_record_array.push_back(
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
}
}
}
......@@ -345,4 +343,4 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::string status = conn->ServerStatus();
std::cout << "Server status after disconnect: " << status << std::endl;
}
}
\ No newline at end of file
}
......@@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id";
static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader";
static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor";
static const char* CONFIG_RESOURCE_CONNECTIONS = "connections";
static const char* CONFIG_SPEED_CONNECTIONS = "speed";
static const char* CONFIG_ENDPOINT_CONNECTIONS = "connections";
class ServerConfig {
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
#include "scheduler/resource/Resource.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/Algorithm.h"
namespace zilliz {
namespace milvus {
namespace engine {
class AlgorithmTest : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, true);
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1);
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2);
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1);
res_mgr_ = std::make_shared<ResourceMgr>();
disk_ = res_mgr_->Add(std::move(disk));
cpu_0_ = res_mgr_->Add(std::move(cpu0));
cpu_1_ = res_mgr_->Add(std::move(cpu1));
cpu_2_ = res_mgr_->Add(std::move(cpu2));
gpu_0_ = res_mgr_->Add(std::move(gpu0));
gpu_1_ = res_mgr_->Add(std::move(gpu1));
auto IO = Connection("IO", 5.0);
auto PCIE = Connection("PCIE", 11.0);
res_mgr_->Connect("disk", "cpu0", IO);
res_mgr_->Connect("cpu0", "cpu1", IO);
res_mgr_->Connect("cpu1", "cpu2", IO);
res_mgr_->Connect("cpu0", "cpu2", IO);
res_mgr_->Connect("cpu1", "gpu0", PCIE);
res_mgr_->Connect("cpu2", "gpu1", PCIE);
}
ResourceWPtr disk_;
ResourceWPtr cpu_0_;
ResourceWPtr cpu_1_;
ResourceWPtr cpu_2_;
ResourceWPtr gpu_0_;
ResourceWPtr gpu_1_;
ResourceMgrPtr res_mgr_;
};
TEST_F(AlgorithmTest, ShortestPath_test) {
std::vector<std::string> sp;
uint64_t cost;
cost = ShortestPath(disk_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_0_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(disk_.lock(), disk_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_0_.lock(), disk_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_2_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
}
}
}
}
\ No newline at end of file
......@@ -30,7 +30,7 @@ protected:
resources_.push_back(gpu_resource_);
auto subscriber = [&](EventPtr event) {
if (event->Type() == EventType::COPY_COMPLETED) {
if (event->Type() == EventType::LOAD_COMPLETED) {
std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_;
cv_.notify_one();
......
......@@ -13,6 +13,7 @@
#include "scheduler/resource/Resource.h"
#include "utils/Error.h"
#include "wrapper/knowhere/vec_index.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace zilliz {
namespace milvus {
......@@ -122,9 +123,6 @@ protected:
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
uint64_t load_count_ = 0;
std::mutex load_mutex_;
std::condition_variable cv_;
};
void
......@@ -157,6 +155,74 @@ TEST_F(SchedulerTest, OnCopyCompleted) {
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
class SchedulerTest2 : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
res_mgr_ = std::make_shared<ResourceMgr>();
disk_ = res_mgr_->Add(std::move(disk));
cpu_0_ = res_mgr_->Add(std::move(cpu0));
cpu_1_ = res_mgr_->Add(std::move(cpu1));
cpu_2_ = res_mgr_->Add(std::move(cpu2));
gpu_0_ = res_mgr_->Add(std::move(gpu0));
gpu_1_ = res_mgr_->Add(std::move(gpu1));
auto IO = Connection("IO", 5.0);
auto PCIE1 = Connection("PCIE", 11.0);
auto PCIE2 = Connection("PCIE", 20.0);
res_mgr_->Connect("disk", "cpu0", IO);
res_mgr_->Connect("cpu0", "cpu1", IO);
res_mgr_->Connect("cpu1", "cpu2", IO);
res_mgr_->Connect("cpu0", "cpu2", IO);
res_mgr_->Connect("cpu1", "gpu0", PCIE1);
res_mgr_->Connect("cpu2", "gpu1", PCIE2);
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
res_mgr_->Start();
scheduler_->Start();
}
void
TearDown() override {
scheduler_->Stop();
res_mgr_->Stop();
}
ResourceWPtr disk_;
ResourceWPtr cpu_0_;
ResourceWPtr cpu_1_;
ResourceWPtr cpu_2_;
ResourceWPtr gpu_0_;
ResourceWPtr gpu_1_;
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
};
TEST_F(SchedulerTest2, SpecifiedResourceTest) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
for (uint64_t i = 0; i < NUM; ++i) {
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<SpecResLabel>(disk_);
tasks.push_back(task);
disk_.lock()->task_table().Put(task);
}
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册