提交 6c62fb09 编写于 作者: Y Yu Kun

Add new API for PathChoice


Former-commit-id: 324ae928a548266d891f18373dd78148b1e52499
上级 6521cd02
......@@ -34,7 +34,7 @@ else()
endif()
message(STATUS "Build type = ${BUILD_TYPE}")
#add_definitions(-DNEW_SCHEDULER)
add_definitions(-DNEW_SCHEDULER)
project(milvus VERSION "${MILVUS_VERSION}")
project(milvus_engine LANGUAGES CUDA CXX)
......
......@@ -54,7 +54,7 @@ set(grpc_service_files
grpc/gen-milvus/milvus.pb.cc
grpc/gen-status/status.grpc.pb.cc
grpc/gen-status/status.pb.cc
)
scheduler/Utils.h)
set(db_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Algorithm.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::vector<std::string>
ShortestPath(const ResourcePtr &src, const ResourcePtr& dest) {
auto node = std::static_pointer_cast<Node>(src);
auto neighbours = node->GetNeighbours();
for (auto &neighbour : neighbours) {
neighbour.connection.speed()
}
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "resource/Resource.h"
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
std::vector<std::string>
ShortestPath(const ResourcePtr &src, const ResourcePtr& dest);
}
}
}
\ No newline at end of file
......@@ -28,6 +28,11 @@ ResourceMgr::GetNumOfComputeResource() {
return count;
}
std::vector<ResourcePtr>
ResourceMgr::GetComputeResource() {
// TODO
}
uint64_t
ResourceMgr::GetNumGpuResource() const {
uint64_t num = 0;
......
......@@ -47,6 +47,9 @@ public:
uint64_t
GetNumOfComputeResource();
std::vector<ResourcePtr>
GetComputeResource();
/*
* Add resource into Resource Management;
* Generate functions on events;
......
......@@ -8,6 +8,7 @@
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
#include "Algorithm.h"
namespace zilliz {
......@@ -137,6 +138,39 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
}
break;
}
case TaskLabelType::SPECIAL_RESOURCE: {
auto self = event->resource_.lock();
// if this resource is disk, assign it to smallest cost resource
if (self->Type() == ResourceType::DISK) {
// step 1:
// calculate shortest path per resource, from disk to compute resource
// calculate by transport_cost
auto compute_resources = res_mgr_.lock()->GetComputeResource();
std::vector<std::vector<std::string>> paths;
for (auto res : compute_resources) {
std::vector<std::string> path = ShortestPath(self, res);
paths.emplace_back(path);
}
// step 2:
// select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
std::vector<uint64_t> costs;
for (auto res : compute_resources) {
uint64_t cost = res->TaskAvgCost() * res->NumOfTaskToExec() + transport_cost;
costs.emplace_back(cost);
}
path, cost
// step 3:
// set path in task
}
// do or move
auto load_event = std::static_pointer_cast<CopyCompletedEvent>(event);
auto path = (load_event->task_table_item_->task->Path);
break;
}
case TaskLabelType::BROADCAST: {
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Utils.h"
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_now_timestamp(); {
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <cstdint>
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_now_timestamp();
}
}
}
\ No newline at end of file
......@@ -19,12 +19,12 @@ public:
: name_(std::move(name)), speed_(speed) {}
const std::string &
get_name() const {
name() const {
return name_;
}
const double
get_speed() const {
uint64_t
speed() const {
return speed_;
}
......@@ -38,7 +38,7 @@ public:
private:
std::string name_;
double speed_;
uint64_t speed_;
};
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include "../Utils.h"
#include "Resource.h"
......@@ -139,7 +140,13 @@ void Resource::executor_function() {
if (task_item == nullptr) {
break;
}
auto start = get_now_timestamp();
Process(task_item->task);
auto finish = get_now_timestamp();
++total_task_;
total_cost_ += finish - start;
task_item->Executed();
if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
......
......@@ -44,7 +44,7 @@ enum class RegisterType {
};
class Resource : public Node, public std::enable_shared_from_this<Resource> {
public:
public:
/*
* Start loader and executor if enable;
*/
......@@ -69,7 +69,7 @@ public:
void
WakeupExecutor();
public:
public:
template<typename T>
void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
......@@ -110,6 +110,22 @@ public:
return enable_executor_;
}
// TODO: const
uint64_t
NumOfTaskToExec() {
uint64_t count = 0;
for (auto &task : task_table_) {
if (task->state == TaskTableItemState::LOADED) ++count;
}
return count;
}
// TODO: need double ?
inline uint64_t
TaskAvgCost() const {
return total_cost_ / total_task_;
}
TaskTable &
task_table();
......@@ -120,7 +136,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
protected:
protected:
Resource(std::string name,
ResourceType type,
uint64_t device_id,
......@@ -142,7 +158,7 @@ protected:
virtual void
Process(TaskPtr task) = 0;
private:
private:
/*
* These function should move to cost.h ???
* COST.H ???
......@@ -162,7 +178,7 @@ private:
TaskTableItemPtr
pick_task_execute();
private:
private:
/*
* Only called by load thread;
*/
......@@ -175,14 +191,17 @@ private:
void
executor_function();
protected:
protected:
uint64_t device_id_;
std::string name_;
private:
private:
ResourceType type_;
TaskTable task_table_;
uint64_t total_cost_ = 0;
uint64_t total_task_ = 0;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(EventPtr)> subscriber_ = nullptr;
......
......@@ -44,6 +44,14 @@ public:
inline TaskType
Type() const { return type_; }
/*
* Transport path;
*/
inline std::vector<std::string>&
path() {
return path_;
}
/*
* Getter and Setter;
*/
......@@ -64,6 +72,7 @@ public:
Clone() = 0;
public:
std::vector<std::string> path_;
std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_;
TaskType type_;
......
......@@ -24,8 +24,8 @@ namespace {
constexpr int64_t TABLE_DIMENSION = 512;
constexpr int64_t TABLE_INDEX_FILE_SIZE = 768;
constexpr int64_t BATCH_ROW_COUNT = 1000000;
constexpr int64_t NQ = 100;
constexpr int64_t TOP_K = 10;
constexpr int64_t NQ = 10;
constexpr int64_t TOP_K = 1000;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
......@@ -177,14 +177,17 @@ namespace {
std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array);
Status stat = conn->Search("zilliz_face", record_array, query_range_array, TOP_K, 10, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
}
if (i == 0) {
PrintSearchResult(search_record_array, topk_query_result_array);
}
}
auto finish = std::chrono::high_resolution_clock::now();
std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
// PrintSearchResult(search_record_array, topk_query_result_array);
// CheckResult(search_record_array, topk_query_result_array);
}
}
......@@ -284,7 +287,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
int64_t row_count = 0;
Status stat = conn->CountTable(TABLE_NAME, row_count);
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
DoSearch(conn, search_record_array, "Search without index");
// DoSearch(conn, search_record_array, "Search without index");
}
{//wait unit build index finish
......@@ -308,7 +311,19 @@ ClientTest::Test(const std::string& address, const std::string& port) {
}
{//search vectors after build index finish
DoSearch(conn, search_record_array, "Search after build index finish");
std::vector<std::pair<int64_t, RowRecord>> search_array;
std::vector<RowRecord> row_record_array;
row_record_array.resize(NQ);
for (int64_t i = 0; i < NQ; ++i) {
row_record_array[i].data.resize(TABLE_DIMENSION);
for (auto j = 0; j < TABLE_DIMENSION; ++j) {
row_record_array[i].data[j] = 1;
}
search_array.push_back(std::make_pair(i, row_record_array[i]));
}
DoSearch(conn, search_array, "Search after build index finish");
// std::cout << conn->DumpTaskTables() << std::endl;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册