提交 de2b5c85 编写于 作者: P peng.xu

Merge branch 'dev_resource' into 'branch-0.4.0'

MS-337 dev basic Resource

See merge request megasearch/milvus!365

Former-commit-id: 543824c04cf992e6a4030c27f2b2a8d9dac4329b
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "CpuResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
void CpuResource::LoadFile(TaskPtr task) {
//if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
//} else if (src.type == CPU) {
// memcpy(src, dest, len);
//} else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
//} else {
// // unknown type, exception
//}
}
void CpuResource::Process(TaskPtr task) {
}
}
}
}
\ No newline at end of file
...@@ -17,29 +17,14 @@ namespace engine { ...@@ -17,29 +17,14 @@ namespace engine {
class CpuResource : public Resource { class CpuResource : public Resource {
public: public:
explicit explicit
CpuResource(std::string name) CpuResource(std::string name);
: Resource(std::move(name), ResourceType::CPU) {}
protected: protected:
void void
LoadFile(TaskPtr task) override { LoadFile(TaskPtr task) override;
// if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
// } else if (src.type == CPU) {
// memcpy(src, dest, len);
// } else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
// } else {
// // unknown type, exception
// }
}
void void
Process(TaskPtr task) override { Process(TaskPtr task) override;
task->Execute();
}
}; };
} }
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DiskResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK) {}
void DiskResource::LoadFile(TaskPtr task) {
}
void DiskResource::Process(TaskPtr task) {
}
}
}
}
...@@ -16,15 +16,14 @@ namespace engine { ...@@ -16,15 +16,14 @@ namespace engine {
class DiskResource : public Resource { class DiskResource : public Resource {
public: public:
explicit explicit
DiskResource(std::string name) DiskResource(std::string name);
: Resource(std::move(name), ResourceType::DISK) {}
protected: protected:
void void
LoadFile(TaskPtr task) override {} LoadFile(TaskPtr task) override;
void void
Process(TaskPtr task) override {} Process(TaskPtr task) override;
}; };
} }
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GpuResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
void GpuResource::LoadFile(TaskPtr task) {
}
void GpuResource::Process(TaskPtr task) {
}
}
}
}
...@@ -16,15 +16,14 @@ namespace engine { ...@@ -16,15 +16,14 @@ namespace engine {
class GpuResource : public Resource { class GpuResource : public Resource {
public: public:
explicit explicit
GpuResource(std::string name) GpuResource(std::string name);
: Resource(std::move(name), ResourceType::GPU) {}
protected: protected:
void void
LoadFile(TaskPtr task) override {} LoadFile(TaskPtr task) override;
void void
Process(TaskPtr task) override {} Process(TaskPtr task) override;
}; };
} }
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <atomic>
#include "Node.h"
namespace zilliz {
namespace milvus {
namespace engine {
Node::Node() {
static std::atomic_uint_fast8_t counter(0);
id_ = counter++;
}
void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_ptr.lock()) {
auto search = neighbours_.find(s->id_);
if (search != neighbours_.end()) {
neighbours_.erase(search);
}
}
}
bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_ptr.lock()) {
auto search = neighbours_.find(s->id_);
if (search != neighbours_.end()) {
return true;
}
}
return false;
}
std::vector<Neighbour> Node::GetNeighbours() {
std::lock_guard<std::mutex> lk(mutex_);
std::vector<Neighbour> ret;
for (auto &e : neighbours_) {
ret.push_back(e.second);
}
return ret;
}
std::string Node::Dump() {
// TODO(linxj): what's that?
return std::__cxx11::string();
}
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_node.lock()) {
Neighbour neighbour(neighbour_node, connection);
neighbours_[s->id_] = neighbour;
}
// else do nothing, consider it..
}
}
}
}
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <map>
#include "../TaskTable.h" #include "../TaskTable.h"
#include "Connection.h" #include "Connection.h"
...@@ -28,29 +29,31 @@ struct Neighbour { ...@@ -28,29 +29,31 @@ struct Neighbour {
Connection connection; Connection connection;
}; };
// TODO(linxj): return type void -> Status
class Node { class Node {
public: public:
Node();
void void
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection);
Neighbour neighbour(neighbour_node, connection);
neighbours_.emplace_back(neighbour);
}
void void
DelNeighbour(NeighbourNodePtr neighbour_ptr) {} DelNeighbour(const NeighbourNodePtr &neighbour_ptr);
bool bool
IsNeighbour(NeighbourNodePtr neighbour_ptr) {} IsNeighbour(const NeighbourNodePtr& neighbour_ptr);
const std::vector<Neighbour> & std::vector<Neighbour>
GetNeighbours() {} GetNeighbours();
public: public:
std::string std::string
Dump(); Dump();
private: private:
std::vector<Neighbour> neighbours_; std::mutex mutex_;
uint8_t id_;
std::map<uint8_t, Neighbour> neighbours_;
}; };
using NodePtr = std::shared_ptr<Node>; using NodePtr = std::shared_ptr<Node>;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
class RegisterHandler {
public:
virtual void Exec() = 0;
};
using RegisterHandlerPtr = std::shared_ptr<RegisterHandler>;
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
Resource::Resource(std::string name, ResourceType type)
: name_(std::move(name)),
type_(type),
running_(false),
load_flag_(false),
exec_flag_(false) {
}
void Resource::Start() {
loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this);
}
void Resource::Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
}
TaskTable &Resource::task_table() {
return task_table_;
}
void Resource::WakeupExecutor() {
exec_cv_.notify_one();
}
void Resource::WakeupLoader() {
load_cv_.notify_one();
}
TaskPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
TaskPtr Resource::pick_task_execute() {
auto indexes = PickToExecute(task_table_, 3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
void Resource::loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
auto task = pick_task_load();
if (task) {
LoadFile(task);
GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec();
}
}
}
void Resource::executor_function() {
GetRegisterFunc(RegisterType::START_UP)->Exec();
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec();
}
}
}
RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) {
// construct object each time.
return register_table_[type]();
}
}
}
}
\ No newline at end of file
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
#include "../TaskTable.h" #include "../TaskTable.h"
#include "../task/Task.h" #include "../task/Task.h"
#include "../Cost.h" #include "../Cost.h"
#include "Node.h"
#include "Connection.h" #include "Connection.h"
#include "Node.h"
#include "RegisterHandler.h"
namespace zilliz { namespace zilliz {
...@@ -29,92 +30,50 @@ enum class ResourceType { ...@@ -29,92 +30,50 @@ enum class ResourceType {
GPU = 2 GPU = 2
}; };
class Resource : public Node { enum class RegisterType {
public: START_UP,
void ON_FINISH_TASK,
Start() { ON_COPY_COMPLETED,
loader_thread_ = std::thread(&Resource::loader_function, this); ON_TASK_TABLE_UPDATED,
executor_thread_ = std::thread(&Resource::executor_function, this); };
}
void
Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
}
TaskTable &
task_table() {
return task_table_;
}
class Resource : public Node {
public: public:
/* /*
* wake up executor; * Event function MUST be a short function, never blocking;
*/
void
WakeupExecutor() {
exec_cv_.notify_one();
}
/*
* wake up loader;
*/ */
void template <typename T>
WakeupLoader() { void Register_T(const RegisterType& type) {
load_cv_.notify_one(); register_table_.emplace(type, [] { return std::make_shared<T>(); });
} }
public: RegisterHandlerPtr
/* GetRegisterFunc(const RegisterType& type);
* Event function MUST be a short function, never blocking;
*/
/*
* Register on start up event;
*/
void void
RegisterOnStartUp(std::function<void(void)> func) { Start();
on_start_up_ = func;
}
/*
* Register on finish one task event;
*/
void void
RegisterOnFinishTask(std::function<void(void)> func) { Stop();
on_finish_task_ = func;
}
TaskTable &
task_table();
public:
/* /*
* Register on copy task data completed event; * wake up executor;
*/ */
void void
RegisterOnCopyCompleted(std::function<void(void)> func) { WakeupExecutor();
on_copy_completed_ = func;
}
/* /*
* Register on task table updated event; * wake up loader;
*/ */
void void
RegisterOnTaskTableUpdated(std::function<void(void)> func) { WakeupLoader();
on_task_table_updated_ = func;
}
protected: protected:
Resource(std::string name, ResourceType type) Resource(std::string name, ResourceType type);
: name_(std::move(name)),
type_(type),
on_start_up_(nullptr),
on_finish_task_(nullptr),
on_copy_completed_(nullptr),
on_task_table_updated_(nullptr),
running_(false),
load_flag_(false),
exec_flag_(false) {
}
// TODO: SearchContextPtr to TaskPtr // TODO: SearchContextPtr to TaskPtr
/* /*
...@@ -142,67 +101,27 @@ private: ...@@ -142,67 +101,27 @@ private:
* Order by start time; * Order by start time;
*/ */
TaskPtr TaskPtr
pick_task_load() { pick_task_load();
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
/* /*
* Pick one task to execute; * Pick one task to execute;
* Pick by start time and priority; * Pick by start time and priority;
*/ */
TaskPtr TaskPtr
pick_task_execute() { pick_task_execute();
auto indexes = PickToExecute(task_table_, 3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
private: private:
/* /*
* Only called by load thread; * Only called by load thread;
*/ */
void void
loader_function() { loader_function();
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
auto task = pick_task_load();
if (task) {
LoadFile(task);
on_copy_completed_();
}
}
}
/* /*
* Only called by worker thread; * Only called by worker thread;
*/ */
void void
executor_function() { executor_function();
on_start_up_();
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
on_finish_task_();
}
}
}
private: private:
...@@ -211,10 +130,7 @@ private: ...@@ -211,10 +130,7 @@ private:
TaskTable task_table_; TaskTable task_table_;
std::function<void(void)> on_start_up_; std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(void)> on_finish_task_;
std::function<void(void)> on_copy_completed_;
std::function<void(void)> on_task_table_updated_;
bool running_; bool running_;
std::thread loader_thread_; std::thread loader_thread_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册