提交 bc8e8981 编写于 作者: G groot

format scheduler code


Former-commit-id: 0336960c0a871bbb835f31eb53945603bd2fcf05
上级 d748ea74
......@@ -16,20 +16,23 @@
// under the License.
#include "Algorithm.h"
#include "scheduler/Algorithm.h"
#include <limits>
#include <unordered_map>
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
constexpr uint64_t MAXINT = std::numeric_limits<uint32_t >::max();
constexpr uint64_t MAXINT = std::numeric_limits<uint32_t>::max();
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string> &path) {
std::vector<std::vector<std::string>> paths;
uint64_t num_of_resources = res_mgr->GetAllResources().size();
......@@ -53,7 +56,6 @@ ShortestPath(const ResourcePtr &src,
std::vector<bool> vis(num_of_resources, false);
std::vector<uint64_t> dis(num_of_resources, MAXINT);
for (auto &res : res_mgr->GetAllResources()) {
auto cur_node = std::static_pointer_cast<Node>(res);
auto cur_neighbours = cur_node->GetNeighbours();
......@@ -105,6 +107,6 @@ ShortestPath(const ResourcePtr &src,
return dis[name_id_map.at(dest->name())];
}
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -30,8 +30,8 @@ uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string>& path);
std::vector<std::string> &path);
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -30,7 +30,6 @@
#include "db/engine/EngineFactory.h"
#include "db/engine/ExecutionEngine.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -43,6 +42,6 @@ using EngineFactory = engine::EngineFactory;
using EngineType = engine::EngineType;
using MetricType = engine::MetricType;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,19 +15,19 @@
// specific language governing permissions and limitations
// under the License.
#include "JobMgr.h"
#include "scheduler/JobMgr.h"
#include "task/Task.h"
#include "TaskCreator.h"
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
using namespace engine;
JobMgr::JobMgr(ResourceMgrPtr res_mgr)
: res_mgr_(std::move(res_mgr)) {}
: res_mgr_(std::move(res_mgr)) {
}
void
JobMgr::Start() {
......@@ -59,7 +59,9 @@ void
JobMgr::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); });
cv_.wait(lock, [this] {
return !queue_.empty();
});
auto job = queue_.front();
queue_.pop();
lock.unlock();
......@@ -84,6 +86,6 @@ JobMgr::build_task(const JobPtr &job) {
return TaskCreator::Create(job);
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -31,15 +31,13 @@
#include "task/Task.h"
#include "ResourceMgr.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class JobMgr {
public:
explicit
JobMgr(ResourceMgrPtr res_mgr);
public:
explicit JobMgr(ResourceMgrPtr res_mgr);
void
Start();
......@@ -47,18 +45,18 @@ public:
void
Stop();
public:
public:
void
Put(const JobPtr &job);
private:
private:
void
worker_function();
std::vector<TaskPtr>
build_task(const JobPtr &job);
private:
private:
bool running_ = false;
std::queue<JobPtr> queue_;
......@@ -72,6 +70,6 @@ private:
using JobMgrPtr = std::shared_ptr<JobMgr>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,8 +16,7 @@
// under the License.
#include "ResourceFactory.h"
#include "scheduler/ResourceFactory.h"
namespace zilliz {
namespace milvus {
......@@ -40,6 +39,6 @@ ResourceFactory::Create(const std::string &name,
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -25,13 +25,12 @@
#include "resource/GpuResource.h"
#include "resource/DiskResource.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class ResourceFactory {
public:
public:
static std::shared_ptr<Resource>
Create(const std::string &name,
const std::string &type,
......@@ -40,8 +39,6 @@ public:
bool enable_executor = true);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,15 +16,13 @@
// specific language governing permissions and limitations
// under the License.
#include "ResourceMgr.h"
#include "scheduler/ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
void
ResourceMgr::Start() {
std::lock_guard<std::mutex> lck(resources_mutex_);
......@@ -186,7 +184,9 @@ void
ResourceMgr::event_process() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
event_cv_.wait(lock, [this] {
return !queue_.empty();
});
auto event = queue_.front();
queue_.pop();
......@@ -201,6 +201,6 @@ ResourceMgr::event_process() {
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -22,21 +22,21 @@
#include <memory>
#include <mutex>
#include <queue>
#include <utility>
#include <condition_variable>
#include "resource/Resource.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class ResourceMgr {
public:
public:
ResourceMgr() = default;
public:
public:
/******** Management Interface ********/
void
Start();
......@@ -58,7 +58,7 @@ public:
subscriber_ = std::move(subscriber);
}
public:
public:
/******** Management Interface ********/
inline std::vector<ResourceWPtr> &
GetDiskResources() {
......@@ -89,10 +89,10 @@ public:
uint64_t
GetNumGpuResource() const;
public:
public:
// TODO: add stats interface(low)
public:
public:
/******** Utility Functions ********/
std::string
Dump();
......@@ -100,14 +100,14 @@ public:
std::string
DumpTaskTables();
private:
private:
void
post_event(const EventPtr &event);
void
event_process();
private:
private:
bool running_ = false;
std::vector<ResourceWPtr> disk_resources_;
......@@ -120,13 +120,11 @@ private:
std::condition_variable event_cv_;
std::thread worker_thread_;
};
using ResourceMgrPtr = std::shared_ptr<ResourceMgr>;
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,12 +16,16 @@
// under the License.
#include "SchedInst.h"
#include "scheduler/SchedInst.h"
#include "server/Config.h"
#include "ResourceFactory.h"
#include "knowhere/index/vector_index/IndexGPUIVF.h"
#include "Utils.h"
#include <vector>
#include <set>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
......@@ -165,6 +169,7 @@ StopSchedulerService() {
SchedInst::GetInstance()->Stop();
ResMgrInst::GetInstance()->Stop();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -24,13 +24,12 @@
#include <mutex>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
class ResMgrInst {
public:
public:
static ResourceMgrPtr
GetInstance() {
if (instance == nullptr) {
......@@ -42,13 +41,13 @@ public:
return instance;
}
private:
private:
static ResourceMgrPtr instance;
static std::mutex mutex_;
};
class SchedInst {
public:
public:
static SchedulerPtr
GetInstance() {
if (instance == nullptr) {
......@@ -60,13 +59,13 @@ public:
return instance;
}
private:
private:
static SchedulerPtr instance;
static std::mutex mutex_;
};
class JobMgrInst {
public:
public:
static scheduler::JobMgrPtr
GetInstance() {
if (instance == nullptr) {
......@@ -78,7 +77,7 @@ public:
return instance;
}
private:
private:
static scheduler::JobMgrPtr instance;
static std::mutex mutex_;
};
......@@ -89,6 +88,6 @@ StartSchedulerService();
void
StopSchedulerService();
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
#include "src/cache/GpuCacheMgr.h"
#include "scheduler/Scheduler.h"
#include "cache/GpuCacheMgr.h"
#include "event/LoadCompletedEvent.h"
#include "Scheduler.h"
#include "action/Action.h"
#include "Algorithm.h"
#include <utility>
namespace zilliz {
namespace milvus {
......@@ -43,7 +43,6 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
}
void
Scheduler::Start() {
running_ = true;
......@@ -79,7 +78,9 @@ void
Scheduler::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
event_cv_.wait(lock, [this] {
return !event_queue_.empty();
});
auto event = event_queue_.front();
event_queue_.pop();
if (event == nullptr) {
......@@ -142,6 +143,6 @@ Scheduler::OnTaskTableUpdated(const EventPtr &event) {
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -22,22 +22,20 @@
#include <mutex>
#include <thread>
#include <queue>
#include <unordered_map>
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
// TODO: refactor, not friendly to unittest, logical in framework code
class Scheduler {
public:
explicit
Scheduler(ResourceMgrWPtr res_mgr);
public:
explicit Scheduler(ResourceMgrWPtr res_mgr);
Scheduler(const Scheduler &) = delete;
Scheduler(Scheduler &&) = delete;
......@@ -66,7 +64,7 @@ public:
std::string
Dump();
private:
private:
/******** Events ********/
/*
......@@ -106,7 +104,7 @@ private:
void
OnTaskTableUpdated(const EventPtr &event);
private:
private:
/*
* Dispatch event to event handler;
*/
......@@ -119,7 +117,7 @@ private:
void
worker_function();
private:
private:
bool running_;
std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_;
......@@ -133,7 +131,6 @@ private:
using SchedulerPtr = std::shared_ptr<Scheduler>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
#include <src/scheduler/tasklabel/BroadcastLabel.h>
#include "TaskCreator.h"
#include "scheduler/TaskCreator.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -64,8 +63,6 @@ TaskCreator::Create(const DeleteJobPtr &job) {
return tasks;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -34,17 +34,16 @@
#include "task/SearchTask.h"
#include "task/DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class TaskCreator {
public:
public:
static std::vector<TaskPtr>
Create(const JobPtr &job);
public:
public:
static std::vector<TaskPtr>
Create(const SearchJobPtr &job);
......@@ -52,6 +51,6 @@ public:
Create(const DeleteJobPtr &job);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,7 +16,7 @@
// under the License.
#include "TaskTable.h"
#include "scheduler/TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include "Utils.h"
......@@ -24,7 +24,6 @@
#include <sstream>
#include <ctime>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -75,6 +74,7 @@ TaskTableItem::Load() {
}
return false;
}
bool
TaskTableItem::Loaded() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -86,6 +86,7 @@ TaskTableItem::Loaded() {
}
return false;
}
bool
TaskTableItem::Execute() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -97,6 +98,7 @@ TaskTableItem::Execute() {
}
return false;
}
bool
TaskTableItem::Executed() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -109,6 +111,7 @@ TaskTableItem::Executed() {
}
return false;
}
bool
TaskTableItem::Move() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -120,6 +123,7 @@ TaskTableItem::Move() {
}
return false;
}
bool
TaskTableItem::Moved() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -206,7 +210,6 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
}
}
TaskTableItemPtr
TaskTable::Get(uint64_t index) {
return table_[index];
......@@ -232,6 +235,6 @@ TaskTable::Dump() {
return ss.str();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -20,11 +20,13 @@
#include <vector>
#include <deque>
#include <mutex>
#include <memory>
#include <utility>
#include <string>
#include "task/SearchTask.h"
#include "event/Event.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -52,7 +54,8 @@ struct TaskTimestamp {
};
struct TaskTableItem {
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {}
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
}
TaskTableItem(const TaskTableItem &src) = delete;
TaskTableItem(TaskTableItem &&) = delete;
......@@ -91,7 +94,7 @@ struct TaskTableItem {
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
class TaskTable {
public:
public:
TaskTable() = default;
TaskTable(const TaskTable &) = delete;
......@@ -145,24 +148,28 @@ public:
return table_.size();
}
public:
public:
TaskTableItemPtr &
operator[](uint64_t index) {
return table_[index];
}
std::deque<TaskTableItemPtr>::iterator begin() { return table_.begin(); }
std::deque<TaskTableItemPtr>::iterator end() { return table_.end(); }
std::deque<TaskTableItemPtr>::iterator begin() {
return table_.begin();
}
public:
std::deque<TaskTableItemPtr>::iterator end() {
return table_.end();
}
public:
std::vector<uint64_t>
PickToLoad(uint64_t limit);
std::vector<uint64_t>
PickToExecute(uint64_t limit);
public:
public:
/******** Action ********/
// TODO: bool to Status
......@@ -227,14 +234,14 @@ public:
return table_[index]->Moved();
}
public:
public:
/*
* Dump;
*/
std::string
Dump();
private:
private:
std::uint64_t id_ = 0;
mutable std::mutex id_mutex_;
std::deque<TaskTableItemPtr> table_;
......@@ -246,7 +253,6 @@ private:
uint64_t last_finish_ = -1;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,12 +16,11 @@
// under the License.
#include "Utils.h"
#include "scheduler/Utils.h"
#include <chrono>
#include <cuda_runtime.h>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -41,6 +40,6 @@ get_num_gpu() {
return n_devices;
}
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -18,7 +18,6 @@
#include <cstdint>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -29,6 +28,6 @@ get_current_timestamp();
uint64_t
get_num_gpu();
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,16 +17,17 @@
#pragma once
#include "../resource/Resource.h"
#include "../ResourceMgr.h"
#include "scheduler/resource/Resource.h"
#include "scheduler/ResourceMgr.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
class Action {
public:
public:
static void
PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self);
......@@ -43,10 +44,8 @@ public:
SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -22,7 +22,6 @@
#include "src/cache/GpuCacheMgr.h"
#include "Action.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -57,13 +56,12 @@ get_neighbours_with_connetion(const ResourcePtr &self) {
return neighbours;
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) {
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t > speeds;
std::vector<uint64_t> speeds;
uint64_t total_speed = 0;
for (auto &neighbour : neighbours) {
uint64_t speed = neighbour.second.speed();
......@@ -87,7 +85,6 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
} else {
//TODO: process
}
}
void
......@@ -99,14 +96,14 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
}
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) {
dest->task_table().Put(task);
}
void
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr,
ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task = event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
......@@ -135,8 +132,8 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr,
void
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
......@@ -181,7 +178,6 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -18,6 +18,8 @@
#pragma once
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
......@@ -33,11 +35,12 @@ enum class EventType {
class Resource;
class Event {
public:
public:
explicit
Event(EventType type, std::weak_ptr<Resource> resource)
: type_(type),
resource_(std::move(resource)) {}
resource_(std::move(resource)) {
}
inline EventType
Type() const {
......@@ -49,13 +52,13 @@ public:
friend std::ostream &operator<<(std::ostream &out, const Event &event);
public:
public:
EventType type_;
std::weak_ptr<Resource> resource_;
};
using EventPtr = std::shared_ptr<Event>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -22,36 +22,40 @@
#include "FinishTaskEvent.h"
#include "TaskTableUpdatedEvent.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const Event &event) {
std::ostream &
operator<<(std::ostream &out, const Event &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) {
std::ostream &
operator<<(std::ostream &out, const StartUpEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const LoadCompletedEvent &event) {
std::ostream &
operator<<(std::ostream &out, const LoadCompletedEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) {
std::ostream &
operator<<(std::ostream &out, const FinishTaskEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) {
std::ostream &
operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) {
out << event.Dump();
return out;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,18 +17,22 @@
#pragma once
#include "Event.h"
#include "scheduler/event/Event.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
class FinishTaskEvent : public Event {
public:
public:
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::FINISH_TASK, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
task_table_item_(std::move(task_table_item)) {
}
inline std::string
Dump() const override {
......@@ -37,10 +41,10 @@ public:
friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event);
public:
public:
TaskTableItemPtr task_table_item_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,19 +17,23 @@
#pragma once
#include "Event.h"
#include "../TaskTable.h"
#include "scheduler/event/Event.h"
#include "scheduler/TaskTable.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
class LoadCompletedEvent : public Event {
public:
public:
LoadCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::LOAD_COMPLETED, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
task_table_item_(std::move(task_table_item)) {
}
inline std::string
Dump() const override {
......@@ -38,10 +42,10 @@ public:
friend std::ostream &operator<<(std::ostream &out, const LoadCompletedEvent &event);
public:
public:
TaskTableItemPtr task_table_item_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,18 +17,21 @@
#pragma once
#include "Event.h"
#include "scheduler/event/Event.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {}
public:
explicit StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {
}
inline std::string
Dump() const override {
......@@ -38,6 +41,6 @@ public:
friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event);
};
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -19,16 +19,19 @@
#include "Event.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {}
public:
explicit TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {
}
inline std::string
Dump() const override {
......@@ -38,7 +41,6 @@ public:
friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
#include "DeleteJob.h"
#include "scheduler/job/DeleteJob.h"
#include <utility>
namespace zilliz {
namespace milvus {
......@@ -29,15 +30,20 @@ DeleteJob::DeleteJob(JobId id,
: Job(id, JobType::DELETE),
table_id_(std::move(table_id)),
meta_ptr_(std::move(meta_ptr)),
num_resource_(num_resource) {}
num_resource_(num_resource) {
}
void DeleteJob::WaitAndDelete() {
void
DeleteJob::WaitAndDelete() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
cv_.wait(lock, [&] {
return done_resource == num_resource_;
});
meta_ptr_->DeleteTableFiles(table_id_);
}
void DeleteJob::ResourceDone() {
void
DeleteJob::ResourceDone() {
{
std::lock_guard<std::mutex> lock(mutex_);
++done_resource;
......@@ -45,7 +51,6 @@ void DeleteJob::ResourceDone() {
cv_.notify_one();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -30,26 +30,25 @@
#include "Job.h"
#include "db/meta/Meta.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class DeleteJob : public Job {
public:
public:
DeleteJob(JobId id,
std::string table_id,
engine::meta::MetaPtr meta_ptr,
uint64_t num_resource);
public:
public:
void
WaitAndDelete();
void
ResourceDone();
public:
public:
std::string
table_id() const {
return table_id_;
......@@ -60,7 +59,7 @@ public:
return meta_ptr_;
}
private:
private:
std::string table_id_;
engine::meta::MetaPtr meta_ptr_;
......@@ -72,7 +71,6 @@ private:
using DeleteJobPtr = std::shared_ptr<DeleteJob>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -27,7 +27,6 @@
#include <condition_variable>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -42,7 +41,7 @@ enum class JobType {
using JobId = std::uint64_t;
class Job {
public:
public:
inline JobId
id() const {
return id_;
......@@ -53,10 +52,11 @@ public:
return type_;
}
protected:
Job(JobId id, JobType type) : id_(id), type_(type) {}
protected:
Job(JobId id, JobType type) : id_(id), type_(type) {
}
private:
private:
JobId id_;
JobType type_;
};
......@@ -64,7 +64,6 @@ private:
using JobPtr = std::shared_ptr<Job>;
using JobWPtr = std::weak_ptr<Job>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,11 +15,9 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/job/SearchJob.h"
#include "utils/Log.h"
#include "SearchJob.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -33,7 +31,8 @@ SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id,
topk_(topk),
nq_(nq),
nprobe_(nprobe),
vectors_(vectors) {}
vectors_(vectors) {
}
bool
SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) {
......@@ -48,11 +47,12 @@ SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) {
return true;
}
void
SearchJob::WaitResult() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return index_files_.empty(); });
cv_.wait(lock, [this] {
return index_files_.empty();
});
SERVER_LOG_DEBUG << "SearchJob " << id() << " all done";
}
......@@ -69,14 +69,11 @@ SearchJob::GetResult() {
return result_;
}
Status&
Status &
SearchJob::GetStatus() {
return status_;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -26,16 +26,15 @@
#include <mutex>
#include <condition_variable>
#include <memory>
#include <utility>
#include "Job.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
using engine::meta::TableFileSchemaPtr;
using Id2IndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
......@@ -43,10 +42,10 @@ using Id2DistanceMap = std::vector<std::pair<int64_t, double>>;
using ResultSet = std::vector<Id2DistanceMap>;
class SearchJob : public Job {
public:
public:
SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float *vectors);
public:
public:
bool
AddIndexFile(const TableFileSchemaPtr &index_file);
......@@ -62,7 +61,7 @@ public:
Status &
GetStatus();
public:
public:
uint64_t
topk() const {
return topk_;
......@@ -77,6 +76,7 @@ public:
nprobe() const {
return nprobe_;
}
const float *
vectors() const {
return vectors_;
......@@ -87,7 +87,7 @@ public:
return index_files_;
}
private:
private:
uint64_t topk_ = 0;
uint64_t nq_ = 0;
uint64_t nprobe_ = 0;
......@@ -105,7 +105,6 @@ private:
using SearchJobPtr = std::shared_ptr<SearchJob>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -19,17 +19,18 @@
#include <string>
#include <sstream>
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
class Connection {
public:
public:
// TODO: update construct function, speed: double->uint64_t
Connection(std::string name, double speed)
: name_(std::move(name)), speed_(speed) {}
: name_(std::move(name)), speed_(speed) {
}
const std::string &
name() const {
......@@ -46,7 +47,7 @@ public:
return 1024 / speed_;
}
public:
public:
std::string
Dump() const {
std::stringstream ss;
......@@ -54,12 +55,11 @@ public:
return ss.str();
}
private:
private:
std::string name_;
uint64_t speed_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,29 +16,34 @@
// under the License.
#include "CpuResource.h"
#include "scheduler/resource/CpuResource.h"
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
std::ostream &
operator<<(std::ostream &out, const CpuResource &resource) {
out << resource.Dump();
return out;
}
CpuResource::CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {}
: Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {
}
void CpuResource::LoadFile(TaskPtr task) {
void
CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
}
void CpuResource::Process(TaskPtr task) {
void
CpuResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -21,13 +21,12 @@
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class CpuResource : public Resource {
public:
public:
explicit
CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
......@@ -38,7 +37,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource);
protected:
protected:
void
LoadFile(TaskPtr task) override;
......@@ -46,6 +45,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,14 +15,17 @@
// specific language governing permissions and limitations
// under the License.
#include "DiskResource.h"
#include "scheduler/resource/DiskResource.h"
#include <string>
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
std::ostream &
operator<<(std::ostream &out, const DiskResource &resource) {
out << resource.Dump();
return out;
}
......@@ -31,15 +34,14 @@ DiskResource::DiskResource(std::string name, uint64_t device_id, bool enable_loa
: Resource(std::move(name), ResourceType::DISK, device_id, enable_loader, enable_executor) {
}
void DiskResource::LoadFile(TaskPtr task) {
void
DiskResource::LoadFile(TaskPtr task) {
}
void DiskResource::Process(TaskPtr task) {
void
DiskResource::Process(TaskPtr task) {
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,16 +17,16 @@
#pragma once
#include "Resource.h"
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
class DiskResource : public Resource {
public:
public:
explicit
DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
......@@ -37,7 +37,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource);
protected:
protected:
void
LoadFile(TaskPtr task) override;
......@@ -45,6 +45,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,29 +16,32 @@
// under the License.
#include "GpuResource.h"
#include "scheduler/resource/GpuResource.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
std::ostream &
operator<<(std::ostream &out, const GpuResource &resource) {
out << resource.Dump();
return out;
}
GpuResource::GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {}
: Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {
}
void GpuResource::LoadFile(TaskPtr task) {
void
GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, device_id_);
}
void GpuResource::Process(TaskPtr task) {
void
GpuResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,16 +17,17 @@
#pragma once
#include "Resource.h"
#include <string>
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
class GpuResource : public Resource {
public:
public:
explicit
GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
......@@ -37,7 +38,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource);
protected:
protected:
void
LoadFile(TaskPtr task) override;
......@@ -45,6 +46,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/resource/Node.h"
#include <atomic>
#include "Node.h"
#include <utility>
namespace zilliz {
namespace milvus {
......@@ -29,7 +29,8 @@ Node::Node() {
id_ = counter++;
}
std::vector<Neighbour> Node::GetNeighbours() {
std::vector<Neighbour>
Node::GetNeighbours() {
std::lock_guard<std::mutex> lk(mutex_);
std::vector<Neighbour> ret;
for (auto &e : neighbours_) {
......@@ -38,7 +39,8 @@ std::vector<Neighbour> Node::GetNeighbours() {
return ret;
}
std::string Node::Dump() {
std::string
Node::Dump() {
std::stringstream ss;
ss << "<Node, id=" << std::to_string(id_) << ">::neighbours:" << std::endl;
for (auto &neighbour : neighbours_) {
......@@ -48,7 +50,8 @@ std::string Node::Dump() {
return ss.str();
}
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
void
Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_node.lock()) {
neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection)));
......@@ -56,6 +59,6 @@ void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &conn
// else do nothing, consider it..
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -20,11 +20,11 @@
#include <vector>
#include <memory>
#include <map>
#include <string>
#include "../TaskTable.h"
#include "scheduler/TaskTable.h"
#include "Connection.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -34,8 +34,9 @@ class Node;
using NeighbourNodePtr = std::weak_ptr<Node>;
struct Neighbour {
Neighbour(NeighbourNodePtr nei, Connection conn)
: neighbour_node(nei), connection(conn) {}
Neighbour(NeighbourNodePtr nei, Connection conn)
: neighbour_node(nei), connection(conn) {
}
NeighbourNodePtr neighbour_node;
Connection connection;
......@@ -43,7 +44,7 @@ struct Neighbour {
// TODO(linxj): return type void -> Status
class Node {
public:
public:
Node();
void
......@@ -52,11 +53,11 @@ public:
std::vector<Neighbour>
GetNeighbours();
public:
public:
std::string
Dump();
private:
private:
std::mutex mutex_;
uint8_t id_;
std::map<uint8_t, Neighbour> neighbours_;
......@@ -65,6 +66,6 @@ private:
using NodePtr = std::shared_ptr<Node>;
using NodeWPtr = std::weak_ptr<Node>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
#include <iostream>
#include "../Utils.h"
#include "Resource.h"
#include "scheduler/resource/Resource.h"
#include "scheduler/Utils.h"
#include <iostream>
#include <utility>
namespace zilliz {
namespace milvus {
......@@ -100,7 +101,8 @@ Resource::NumOfTaskToExec() {
return count;
}
TaskTableItemPtr Resource::pick_task_load() {
TaskTableItemPtr
Resource::pick_task_load() {
auto indexes = task_table_.PickToLoad(10);
for (auto index : indexes) {
// try to set one task loading, then return
......@@ -111,7 +113,8 @@ TaskTableItemPtr Resource::pick_task_load() {
return nullptr;
}
TaskTableItemPtr Resource::pick_task_execute() {
TaskTableItemPtr
Resource::pick_task_execute() {
auto indexes = task_table_.PickToExecute(3);
for (auto index : indexes) {
// try to set one task executing, then return
......@@ -122,10 +125,13 @@ TaskTableItemPtr Resource::pick_task_execute() {
return nullptr;
}
void Resource::loader_function() {
void
Resource::loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
load_cv_.wait(lock, [&] {
return load_flag_;
});
load_flag_ = false;
lock.unlock();
while (true) {
......@@ -140,18 +146,20 @@ void Resource::loader_function() {
subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
void Resource::executor_function() {
void
Resource::executor_function() {
if (subscriber_) {
auto event = std::make_shared<StartUpEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event));
}
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
exec_cv_.wait(lock, [&] {
return exec_flag_;
});
exec_flag_ = false;
lock.unlock();
while (true) {
......@@ -172,10 +180,9 @@ void Resource::executor_function() {
subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include <memory>
#include <utility>
#include <thread>
#include <functional>
#include <condition_variable>
......@@ -34,7 +35,6 @@
#include "Connection.h"
#include "Node.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -104,7 +104,7 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
return task_table_;
}
public:
public:
inline bool
HasLoader() const {
return enable_loader_;
......@@ -212,7 +212,6 @@ public:
using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.
#include "TestResource.h"
#include "scheduler/resource/TestResource.h"
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const TestResource &resource) {
std::ostream &
operator<<(std::ostream &out, const TestResource &resource) {
out << resource.Dump();
return out;
}
......@@ -31,15 +33,16 @@ TestResource::TestResource(std::string name, uint64_t device_id, bool enable_loa
: Resource(std::move(name), ResourceType::TEST, device_id, enable_loader, enable_executor) {
}
void TestResource::LoadFile(TaskPtr task) {
void
TestResource::LoadFile(TaskPtr task) {
task->Load(LoadType::TEST, 0);
}
void TestResource::Process(TaskPtr task) {
void
TestResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -17,16 +17,17 @@
#pragma once
#include "Resource.h"
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
class TestResource : public Resource {
public:
public:
explicit
TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
......@@ -37,7 +38,7 @@ public:
friend std::ostream &operator<<(std::ostream &out, const TestResource &resource);
protected:
protected:
void
LoadFile(TaskPtr task) override;
......@@ -45,6 +46,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -16,19 +16,18 @@
// under the License.
#include "DeleteTask.h"
#include "scheduler/task/DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr &delete_job)
: Task(TaskType::DeleteTask), delete_job_(delete_job) {}
: Task(TaskType::DeleteTask), delete_job_(delete_job) {
}
void
XDeleteTask::Load(LoadType type, uint8_t device_id) {
}
void
......@@ -36,6 +35,6 @@ XDeleteTask::Execute() {
delete_job_->ResourceDone();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -20,15 +20,13 @@
#include "scheduler/job/DeleteJob.h"
#include "Task.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class XDeleteTask : public Task {
public:
explicit
XDeleteTask(const scheduler::DeleteJobPtr &job);
public:
explicit XDeleteTask(const scheduler::DeleteJobPtr &job);
void
Load(LoadType type, uint8_t device_id) override;
......@@ -36,10 +34,10 @@ public:
void
Execute() override;
public:
public:
scheduler::DeleteJobPtr delete_job_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -20,7 +20,6 @@
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -29,7 +28,8 @@ class Path {
public:
Path() = default;
Path(std::vector<std::string>& path, uint64_t index) : path_(path), index_(index) {}
Path(std::vector<std::string> &path, uint64_t index) : path_(path), index_(index) {
}
void
push_back(const std::string &str) {
......@@ -49,7 +49,6 @@ class Path {
} else {
return nullptr;
}
}
std::string
......@@ -67,14 +66,19 @@ class Path {
return path_[index];
}
std::vector<std::string>::iterator begin() { return path_.begin(); }
std::vector<std::string>::iterator end() { return path_.end(); }
std::vector<std::string>::iterator begin() {
return path_.begin();
}
std::vector<std::string>::iterator end() {
return path_.end();
}
public:
std::vector<std::string> path_;
uint64_t index_ = 0;
};
}
}
}
\ No newline at end of file
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,15 +15,16 @@
// specific language governing permissions and limitations
// under the License.
#include "SearchTask.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/job/SearchJob.h"
#include "metrics/Metrics.h"
#include "db/engine/EngineFactory.h"
#include "utils/TimeRecorder.h"
#include "utils/Log.h"
#include <thread>
#include "scheduler/job/SearchJob.h"
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
......@@ -104,7 +105,6 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file)
(MetricType) file_->metric_type_,
file_->nlist_);
}
}
void
......@@ -144,7 +144,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (auto job = job_.lock()){
if (auto job = job_.lock()) {
auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
search_job->SearchDone(file_->id_);
search_job->GetStatus() = s;
......@@ -183,7 +183,7 @@ XSearchTask::Execute() {
server::CollectDurationMetrics metrics(index_type_);
std::vector<long> output_ids;
std::vector<int64_t> output_ids;
std::vector<float> output_distance;
if (auto job = job_.lock()) {
......@@ -192,7 +192,7 @@ XSearchTask::Execute() {
uint64_t nq = search_job->nq();
uint64_t topk = search_job->topk();
uint64_t nprobe = search_job->nprobe();
const float* vectors = search_job->vectors();
const float *vectors = search_job->vectors();
output_ids.resize(topk * nq);
output_distance.resize(topk * nq);
......@@ -236,11 +236,12 @@ XSearchTask::Execute() {
index_engine_ = nullptr;
}
Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distance,
uint64_t nq,
uint64_t topk,
scheduler::ResultSet &result_set) {
Status
XSearchTask::ClusterResult(const std::vector<int64_t> &output_ids,
const std::vector<float> &output_distance,
uint64_t nq,
uint64_t topk,
scheduler::ResultSet &result_set) {
if (output_ids.size() < nq * topk || output_distance.size() < nq * topk) {
std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) +
" distance array size: " + std::to_string(output_distance.size());
......@@ -275,10 +276,11 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
return Status::OK();
}
Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
scheduler::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending) {
Status
XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
scheduler::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending) {
//Note: the score_src and score_target are already arranged by score in ascending order
if (distance_src.empty()) {
ENGINE_LOG_WARNING << "Empty distance source array";
......@@ -349,10 +351,11 @@ Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
return Status::OK();
}
Status XSearchTask::TopkResult(scheduler::ResultSet &result_src,
uint64_t topk,
bool ascending,
scheduler::ResultSet &result_target) {
Status
XSearchTask::TopkResult(scheduler::ResultSet &result_src,
uint64_t topk,
bool ascending,
scheduler::ResultSet &result_target) {
if (result_target.empty()) {
result_target.swap(result_src);
return Status::OK();
......@@ -381,7 +384,6 @@ Status XSearchTask::TopkResult(scheduler::ResultSet &result_src,
return Status::OK();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -21,6 +21,7 @@
#include "scheduler/job/SearchJob.h"
#include "scheduler/Definition.h"
#include <vector>
namespace zilliz {
namespace milvus {
......@@ -28,9 +29,8 @@ namespace scheduler {
// TODO: rewrite
class XSearchTask : public Task {
public:
explicit
XSearchTask(TableFileSchemaPtr file);
public:
explicit XSearchTask(TableFileSchemaPtr file);
void
Load(LoadType type, uint8_t device_id) override;
......@@ -38,8 +38,8 @@ public:
void
Execute() override;
public:
static Status ClusterResult(const std::vector<long> &output_ids,
public:
static Status ClusterResult(const std::vector<int64_t> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
uint64_t topk,
......@@ -55,7 +55,7 @@ public:
bool ascending,
scheduler::ResultSet &result_target);
public:
public:
TableFileSchemaPtr file_;
size_t index_id_ = 0;
......@@ -66,6 +66,6 @@ public:
static std::mutex merge_mutex_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -25,7 +25,6 @@
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -49,20 +48,22 @@ using TaskPtr = std::shared_ptr<Task>;
// TODO: re-design
class Task {
public:
explicit
Task(TaskType type) : type_(type) {}
public:
explicit Task(TaskType type) : type_(type) {
}
/*
* Just Getter;
*/
inline TaskType
Type() const { return type_; }
Type() const {
return type_;
}
/*
* Transport path;
*/
inline Path&
inline Path &
path() {
return task_path_;
}
......@@ -75,14 +76,14 @@ public:
return label_;
}
public:
public:
virtual void
Load(LoadType type, uint8_t device_id) = 0;
virtual void
Execute() = 0;
public:
public:
Path task_path_;
// std::vector<SearchContextPtr> search_contexts_;
scheduler::JobWPtr job_;
......@@ -90,7 +91,6 @@ public:
TaskLabelPtr label_ = nullptr;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -15,16 +15,15 @@
// specific language governing permissions and limitations
// under the License.
#include <src/cache/GpuCacheMgr.h>
#include "TestTask.h"
#include "scheduler/task/TestTask.h"
#include "cache/GpuCacheMgr.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {}
TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {
}
void
TestTask::Load(LoadType type, uint8_t device_id) {
......@@ -44,10 +43,11 @@ TestTask::Execute() {
void
TestTask::Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_; });
}
}
}
cv_.wait(lock, [&] {
return done_;
});
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -19,17 +19,15 @@
#include "SearchTask.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class TestTask : public XSearchTask {
public:
explicit
TestTask(TableFileSchemaPtr& file);
public:
explicit TestTask(TableFileSchemaPtr &file);
public:
public:
void
Load(LoadType type, uint8_t device_id) override;
......@@ -39,7 +37,7 @@ public:
void
Wait();
public:
public:
uint64_t load_count_ = 0;
uint64_t exec_count_ = 0;
......@@ -48,7 +46,6 @@ public:
std::condition_variable cv_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -21,19 +21,19 @@
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
class BroadcastLabel : public TaskLabel {
public:
BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {}
public:
BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {
}
};
using BroadcastLabelPtr = std::shared_ptr<BroadcastLabel>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -21,20 +21,18 @@
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
class DefaultLabel : public TaskLabel {
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {}
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {
}
};
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -22,7 +22,6 @@
#include <string>
#include <memory>
class Resource;
using ResourceWPtr = std::weak_ptr<Resource>;
......@@ -32,9 +31,10 @@ namespace milvus {
namespace scheduler {
class SpecResLabel : public TaskLabel {
public:
SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {}
public:
explicit SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {
}
inline ResourceWPtr &
resource() {
......@@ -46,14 +46,13 @@ public:
return resource_name_;
}
private:
private:
ResourceWPtr resource_;
std::string resource_name_;
};
using SpecResLabelPtr = std::shared_ptr<SpecResLabel>();
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -30,23 +30,22 @@ enum class TaskLabelType {
};
class TaskLabel {
public:
public:
inline TaskLabelType
Type() const {
return type_;
}
protected:
explicit
TaskLabel(TaskLabelType type) : type_(type) {}
protected:
explicit TaskLabel(TaskLabelType type) : type_(type) {
}
private:
private:
TaskLabelType type_;
};
using TaskLabelPtr = std::shared_ptr<TaskLabel>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册