提交 cbc734d8 编写于 作者: X xj.lin 提交者: wxyu

MS-337 dev basic Resource


Former-commit-id: 7486415fa96cd3db77320aeb4b01dd78a2b4c878
上级 94001015
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "CpuResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
void CpuResource::LoadFile(TaskPtr task) {
//if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
//} else if (src.type == CPU) {
// memcpy(src, dest, len);
//} else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
//} else {
// // unknown type, exception
//}
}
void CpuResource::Process(TaskPtr task) {
}
}
}
}
\ No newline at end of file
......@@ -17,29 +17,14 @@ namespace engine {
class CpuResource : public Resource {
public:
explicit
CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
CpuResource(std::string name);
protected:
void
LoadFile(TaskPtr task) override {
// if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
// } else if (src.type == CPU) {
// memcpy(src, dest, len);
// } else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
// } else {
// // unknown type, exception
// }
}
LoadFile(TaskPtr task) override;
void
Process(TaskPtr task) override {
task->Execute();
}
Process(TaskPtr task) override;
};
}
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DiskResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK) {}
void DiskResource::LoadFile(TaskPtr task) {
}
void DiskResource::Process(TaskPtr task) {
}
}
}
}
......@@ -16,15 +16,14 @@ namespace engine {
class DiskResource : public Resource {
public:
explicit
DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK) {}
DiskResource(std::string name);
protected:
void
LoadFile(TaskPtr task) override {}
LoadFile(TaskPtr task) override;
void
Process(TaskPtr task) override {}
Process(TaskPtr task) override;
};
}
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GpuResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
void GpuResource::LoadFile(TaskPtr task) {
}
void GpuResource::Process(TaskPtr task) {
}
}
}
}
......@@ -16,15 +16,14 @@ namespace engine {
class GpuResource : public Resource {
public:
explicit
GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
GpuResource(std::string name);
protected:
void
LoadFile(TaskPtr task) override {}
LoadFile(TaskPtr task) override;
void
Process(TaskPtr task) override {}
Process(TaskPtr task) override;
};
}
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <atomic>
#include "Node.h"
namespace zilliz {
namespace milvus {
namespace engine {
Node::Node() {
static std::atomic_uint_fast8_t counter(0);
id_ = counter++;
}
void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_ptr.lock()) {
auto search = neighbours_.find(s->id_);
if (search != neighbours_.end()) {
neighbours_.erase(search);
}
}
}
bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_ptr.lock()) {
auto search = neighbours_.find(s->id_);
if (search != neighbours_.end()) {
return true;
}
}
return false;
}
std::vector<Neighbour> Node::GetNeighbours() {
std::lock_guard<std::mutex> lk(mutex_);
std::vector<Neighbour> ret;
for (auto &e : neighbours_) {
ret.push_back(e.second);
}
return ret;
}
std::string Node::Dump() {
// TODO(linxj): what's that?
return std::__cxx11::string();
}
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_node.lock()) {
Neighbour neighbour(neighbour_node, connection);
neighbours_[s->id_] = neighbour;
}
// else do nothing, consider it..
}
}
}
}
......@@ -7,6 +7,7 @@
#include <vector>
#include <memory>
#include <map>
#include "../TaskTable.h"
#include "Connection.h"
......@@ -28,29 +29,31 @@ struct Neighbour {
Connection connection;
};
// TODO(linxj): return type void -> Status
class Node {
public:
Node();
void
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
Neighbour neighbour(neighbour_node, connection);
neighbours_.emplace_back(neighbour);
}
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection);
void
DelNeighbour(NeighbourNodePtr neighbour_ptr) {}
DelNeighbour(const NeighbourNodePtr &neighbour_ptr);
bool
IsNeighbour(NeighbourNodePtr neighbour_ptr) {}
IsNeighbour(const NeighbourNodePtr& neighbour_ptr);
const std::vector<Neighbour> &
GetNeighbours() {}
std::vector<Neighbour>
GetNeighbours();
public:
std::string
Dump();
private:
std::vector<Neighbour> neighbours_;
std::mutex mutex_;
uint8_t id_;
std::map<uint8_t, Neighbour> neighbours_;
};
using NodePtr = std::shared_ptr<Node>;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
class RegisterHandler {
public:
virtual void Exec() = 0;
};
using RegisterHandlerPtr = std::shared_ptr<RegisterHandler>;
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
Resource::Resource(std::string name, ResourceType type)
: name_(std::move(name)),
type_(type),
running_(false),
load_flag_(false),
exec_flag_(false) {
}
void Resource::Start() {
loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this);
}
void Resource::Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
}
TaskTable &Resource::task_table() {
return task_table_;
}
void Resource::WakeupExecutor() {
exec_cv_.notify_one();
}
void Resource::WakeupLoader() {
load_cv_.notify_one();
}
TaskPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
TaskPtr Resource::pick_task_execute() {
auto indexes = PickToExecute(task_table_, 3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
void Resource::loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
auto task = pick_task_load();
if (task) {
LoadFile(task);
GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec();
}
}
}
void Resource::executor_function() {
GetRegisterFunc(RegisterType::START_UP)->Exec();
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec();
}
}
}
RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) {
// construct object each time.
return register_table_[type]();
}
}
}
}
\ No newline at end of file
......@@ -15,8 +15,9 @@
#include "../TaskTable.h"
#include "../task/Task.h"
#include "../Cost.h"
#include "Node.h"
#include "Connection.h"
#include "Node.h"
#include "RegisterHandler.h"
namespace zilliz {
......@@ -29,92 +30,50 @@ enum class ResourceType {
GPU = 2
};
enum class RegisterType {
START_UP,
ON_FINISH_TASK,
ON_COPY_COMPLETED,
ON_TASK_TABLE_UPDATED,
};
class Resource : public Node {
public:
void
Start() {
loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this);
/*
* Event function MUST be a short function, never blocking;
*/
template <typename T>
void Register_T(const RegisterType& type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
}
RegisterHandlerPtr
GetRegisterFunc(const RegisterType& type);
void
Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
}
Start();
void
Stop();
TaskTable &
task_table() {
return task_table_;
}
task_table();
public:
/*
* wake up executor;
*/
void
WakeupExecutor() {
exec_cv_.notify_one();
}
WakeupExecutor();
/*
* wake up loader;
*/
void
WakeupLoader() {
load_cv_.notify_one();
}
public:
/*
* Event function MUST be a short function, never blocking;
*/
/*
* Register on start up event;
*/
void
RegisterOnStartUp(std::function<void(void)> func) {
on_start_up_ = func;
}
/*
* Register on finish one task event;
*/
void
RegisterOnFinishTask(std::function<void(void)> func) {
on_finish_task_ = func;
}
/*
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted(std::function<void(void)> func) {
on_copy_completed_ = func;
}
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated(std::function<void(void)> func) {
on_task_table_updated_ = func;
}
WakeupLoader();
protected:
Resource(std::string name, ResourceType type)
: name_(std::move(name)),
type_(type),
on_start_up_(nullptr),
on_finish_task_(nullptr),
on_copy_completed_(nullptr),
on_task_table_updated_(nullptr),
running_(false),
load_flag_(false),
exec_flag_(false) {
}
Resource(std::string name, ResourceType type);
// TODO: SearchContextPtr to TaskPtr
/*
......@@ -142,67 +101,27 @@ private:
* Order by start time;
*/
TaskPtr
pick_task_load() {
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
pick_task_load();
/*
* Pick one task to execute;
* Pick by start time and priority;
*/
TaskPtr
pick_task_execute() {
auto indexes = PickToExecute(task_table_, 3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
}
pick_task_execute();
private:
/*
* Only called by load thread;
*/
void
loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
auto task = pick_task_load();
if (task) {
LoadFile(task);
on_copy_completed_();
}
}
}
loader_function();
/*
* Only called by worker thread;
*/
void
executor_function() {
on_start_up_();
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
on_finish_task_();
}
}
}
executor_function();
private:
......@@ -211,10 +130,7 @@ private:
TaskTable task_table_;
std::function<void(void)> on_start_up_;
std::function<void(void)> on_finish_task_;
std::function<void(void)> on_copy_completed_;
std::function<void(void)> on_task_table_updated_;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
bool running_;
std::thread loader_thread_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册