提交 cdb6f96d 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-408 Add device_id in resource construct function

See merge request megasearch/milvus!415

Former-commit-id: e42d3bb4bc319191bd11690644d69bc57bc6c12b
......@@ -44,6 +44,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-403 - Add GpuCacheMgr
- MS-404 - Release index after search task done avoid memory increment continues
- MS-405 - Add delete task support
- MS-408 - Add device_id in resource construct function
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -13,15 +13,16 @@ namespace engine {
std::shared_ptr<Resource>
ResourceFactory::Create(const std::string &name,
const std::string &alias,
const std::string &type,
uint64_t device_id,
bool enable_loader,
bool enable_executor) {
if (name == "disk") {
return std::make_shared<DiskResource>(alias, enable_loader, enable_executor);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias, enable_loader, enable_executor);
} else if (name == "gpu") {
return std::make_shared<GpuResource>(alias, enable_loader, enable_executor);
if (type == "DISK") {
return std::make_shared<DiskResource>(name, device_id, enable_loader, enable_executor);
} else if (type == "CPU") {
return std::make_shared<CpuResource>(name, device_id, enable_loader, enable_executor);
} else if (type == "GPU") {
return std::make_shared<GpuResource>(name, device_id, enable_loader, enable_executor);
} else {
return nullptr;
}
......
......@@ -22,7 +22,8 @@ class ResourceFactory {
public:
static std::shared_ptr<Resource>
Create(const std::string &name,
const std::string &alias = "",
const std::string &type,
uint64_t device_id,
bool enable_loader = true,
bool enable_executor = true);
};
......
......@@ -48,6 +48,16 @@ ResourceMgr::Add(ResourcePtr &&resource) {
return ret;
}
void
ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) {
auto res1 = get_resource_by_name(name1);
auto res2 = get_resource_by_name(name2);
if (res1 && res2) {
res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
}
}
void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
if (auto observe_a = res1.lock()) {
......@@ -116,6 +126,16 @@ ResourceMgr::DumpTaskTables() {
return ss.str();
}
ResourcePtr
ResourceMgr::get_resource_by_name(const std::string &name) {
for (auto &res : resources_) {
if (res->Name() == name) {
return res;
}
}
return nullptr;
}
void
ResourceMgr::event_process() {
while (running_) {
......
......@@ -49,6 +49,9 @@ public:
ResourceWPtr
Add(ResourcePtr &&resource);
void
Connect(const std::string &res1, const std::string &res2, Connection &connection);
/*
* Create connection between A and B;
*/
......@@ -80,6 +83,9 @@ public:
DumpTaskTables();
private:
ResourcePtr
get_resource_by_name(const std::string &name);
void
event_process();
......
......@@ -48,6 +48,7 @@ ToString(const TaskTimestamp &timestamp) {
ss << ", executed=" << timestamp.executed;
ss << ", move=" << timestamp.move;
ss << ", moved=" << timestamp.moved;
ss << ", finish=" << timestamp.finish;
ss << ">";
return ss.str();
}
......@@ -92,6 +93,7 @@ TaskTableItem::Executed() {
state = TaskTableItemState::EXECUTED;
lock.unlock();
timestamp.executed = get_now_timestamp();
timestamp.finish = get_now_timestamp();
return true;
}
return false;
......@@ -114,6 +116,7 @@ TaskTableItem::Moved() {
state = TaskTableItemState::MOVED;
lock.unlock();
timestamp.moved = get_now_timestamp();
timestamp.finish = get_now_timestamp();
return true;
}
return false;
......
......@@ -36,6 +36,7 @@ struct TaskTimestamp {
uint64_t loaded = 0;
uint64_t execute = 0;
uint64_t executed = 0;
uint64_t finish = 0;
};
struct TaskTableItem {
......
......@@ -20,7 +20,7 @@ next(std::list<ResourcePtr> &neighbours, std::list<ResourcePtr>::iterator &it) {
}
}
// TODO: this function called with only on tasks, so it will always push task to first neighbour
void
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
CacheMgr cache;
......@@ -31,7 +31,7 @@ push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighb
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
task = task->Clone();
// task = task->Clone();
(*it)->task_table().Put(task);
next(neighbours, it);
}
......
......@@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
return out;
}
CpuResource::CpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, enable_loader, enable_executor) {}
CpuResource::CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {}
void CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
......
......@@ -17,7 +17,7 @@ namespace engine {
class CpuResource : public Resource {
public:
explicit
CpuResource(std::string name, bool enable_loader, bool enable_executor);
CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -15,8 +15,8 @@ std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
return out;
}
DiskResource::DiskResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::DISK, enable_loader, enable_executor) {
DiskResource::DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::DISK, device_id, enable_loader, enable_executor) {
}
void DiskResource::LoadFile(TaskPtr task) {
......
......@@ -16,7 +16,7 @@ namespace engine {
class DiskResource : public Resource {
public:
explicit
DiskResource(std::string name, bool enable_loader, bool enable_executor);
DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -16,11 +16,11 @@ std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
return out;
}
GpuResource::GpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, enable_loader, enable_executor) {}
GpuResource::GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {}
void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
task->Load(LoadType::CPU2GPU, device_id_);
}
void GpuResource::Process(TaskPtr task) {
......
......@@ -16,7 +16,7 @@ namespace engine {
class GpuResource : public Resource {
public:
explicit
GpuResource(std::string name, bool enable_loader, bool enable_executor);
GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -18,10 +18,12 @@ std::ostream &operator<<(std::ostream &out, const Resource &resource) {
Resource::Resource(std::string name,
ResourceType type,
uint64_t device_id,
bool enable_loader,
bool enable_executor)
: name_(std::move(name)),
type_(type),
device_id_(device_id),
running_(false),
enable_loader_(enable_loader),
enable_executor_(enable_executor),
......
......@@ -82,6 +82,11 @@ public:
subscriber_ = std::move(subscriber);
}
inline std::string
Name() const {
return name_;
}
inline ResourceType
Type() const {
return type_;
......@@ -112,6 +117,7 @@ public:
protected:
Resource(std::string name,
ResourceType type,
uint64_t device_id,
bool enable_loader,
bool enable_executor);
......@@ -163,6 +169,9 @@ private:
void
executor_function();
protected:
uint64_t device_id_;
private:
std::string name_;
ResourceType type_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册