提交 f9e74971 编写于 作者: J jinhai

Merge branch '0.5.1' into '0.5.1'

0.5.1

See merge request megasearch/milvus!801

Former-commit-id: 26d6c4b4741295f1bfaac3ad17a54a79e49d0995
......@@ -5,8 +5,12 @@ Please mark all change in change log and use the ticket from JIRA.
# Milvus 0.5.1 (TODO)
## Bug
## Feature
- \#90 - The server start error messages could be improved to enhance user experience
- \#104 - test_scheduler core dump
- \#115 - Using new structure for tasktable
- \#139 - New config opion use_gpu_threshold
## Improvement
- \#64 - Improvement dump function in scheduler
......@@ -16,9 +20,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#96 - Remove .a file in milvus/lib for docker-version
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
- \#122 - Add unique id for Job
## Feature
- \#115 - Using new structure for tasktable
- \#130 - Set task state MOVED after resource copy it completed
## Task
......
......@@ -36,6 +36,7 @@ cache_config:
engine_config:
use_blas_threshold: 20 # if nq < use_blas_threshold, use SSE, faster with fluctuated response times
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
use_gpu_threshold: 1000
resource_config:
search_resources: # define the GPUs used for search computation, must be in format: gpux
......
......@@ -91,7 +91,7 @@ JobMgr::worker_function() {
// disk resources NEVER be empty.
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
for (auto& task : tasks) {
disk->task_table().Put(task);
disk->task_table().Put(task, nullptr);
}
}
}
......
......@@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
if (resource->HasExecutor() == false) {
load_completed_event->task_table_item_->Move();
}
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource);
break;
}
default: { break; }
......
......@@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) {
}
void
TaskTable::Put(TaskPtr task) {
auto item = std::make_shared<TaskTableItem>();
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
auto item = std::make_shared<TaskTableItem>(std::move(from));
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
......@@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) {
}
}
void
TaskTable::Put(std::vector<TaskPtr>& tasks) {
for (auto& task : tasks) {
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
item->timestamp.start = get_current_timestamp();
table_.put(std::move(item));
}
if (subscriber_) {
subscriber_();
}
}
size_t
TaskTable::TaskToExecute() {
size_t count = 0;
......
......@@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable {
Dump() const override;
};
struct TaskTableItem;
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
struct TaskTableItem : public interface::dumpable {
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
explicit TaskTableItem(TaskTableItemPtr f = nullptr)
: id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) {
}
TaskTableItem(const TaskTableItem& src) = delete;
......@@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable {
TaskTableItemState state; // the state;
std::mutex mutex;
TaskTimestamp timestamp;
TaskTableItemPtr from;
bool
IsFinish();
......@@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable {
Dump() const override;
};
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
class TaskTable : public interface::dumpable {
public:
TaskTable() : table_(1ULL << 16ULL) {
......@@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable {
* Put one task;
*/
void
Put(TaskPtr task);
/*
* Put tasks back of task table;
* Called by DBImpl;
*/
void
Put(std::vector<TaskPtr>& tasks);
Put(TaskPtr task, TaskTableItemPtr from = nullptr);
size_t
TaskToExecute();
......
......@@ -28,13 +28,13 @@ namespace scheduler {
class Action {
public:
static void
PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self);
PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self);
static void
PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self);
PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self);
static void
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
static void
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
......
......@@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) {
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t> speeds;
......@@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
for (uint64_t i = 0; i < speeds.size(); ++i) {
rd_speed -= speeds[i];
if (rd_speed <= 0) {
neighbours[i].first->task_table().Put(task);
neighbours[i].first->task_table().Put(task_item->task, task_item);
return;
}
}
......@@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
}
void
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) {
auto neighbours = get_neighbours(self);
for (auto& neighbour : neighbours) {
neighbour->task_table().Put(task);
neighbour->task_table().Put(task_item->task, task_item);
}
}
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
dest->task_table().Put(task);
Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) {
dest->task_table().Put(task_item->task, task_item);
}
void
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
bool moved = false;
......@@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_->task, dest_resource);
PushTaskToResource(event->task_table_item_, dest_resource);
break;
}
}
......@@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
}
if (not moved) {
PushTaskToNeighbourRandomly(task, resource);
PushTaskToNeighbourRandomly(task_item, resource);
}
}
}
......@@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
void
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
......@@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
// next_res->task_table().Put(task);
// }
event->task_table_item_->Move();
next_res->task_table().Put(task);
next_res->task_table().Put(task, task_item);
}
}
......
......@@ -21,11 +21,20 @@
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
LargeSQ8HPass::LargeSQ8HPass() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigUseGpuThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
}
bool
LargeSQ8HPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
......@@ -40,7 +49,8 @@ LargeSQ8HPass::Run(const TaskPtr& task) {
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if (search_job->nq() < 100) {
if (search_job->nq() < threshold_) {
return false;
}
......
......@@ -34,11 +34,14 @@ namespace scheduler {
class LargeSQ8HPass : public Pass {
public:
LargeSQ8HPass() = default;
LargeSQ8HPass();
public:
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
};
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
......
......@@ -180,6 +180,10 @@ Resource::loader_function() {
}
LoadFile(task_item->task);
task_item->Loaded();
if (task_item->from) {
task_item->from->Moved();
task_item->from = nullptr;
}
if (subscriber_) {
auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
......
......@@ -193,6 +193,12 @@ Config::ValidateConfig() {
return s;
}
int32_t engine_use_gpu_threshold;
s = GetEngineConfigUseGpuThreshold(engine_use_gpu_threshold);
if (!s.ok()) {
return s;
}
/* resource config */
std::string resource_mode;
s = GetResourceConfigMode(resource_mode);
......@@ -324,6 +330,11 @@ Config::ResetDefaultConfig() {
return s;
}
s = SetEngineConfigUseGpuThreshold(CONFIG_ENGINE_USE_GPU_THRESHOLD_DEFAULT);
if (!s.ok()) {
return s;
}
/* resource config */
s = SetResourceConfigMode(CONFIG_RESOURCE_MODE_DEFAULT);
if (!s.ok()) {
......@@ -656,6 +667,16 @@ Config::CheckEngineConfigOmpThreadNum(const std::string& value) {
return Status::OK();
}
Status
Config::CheckEngineConfigUseGpuThreshold(const std::string& value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
std::string msg = "Invalid gpu threshold: " + value +
". Possible reason: engine_config.use_gpu_threshold is not a positive integer.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
return Status::OK();
}
Status
Config::CheckResourceConfigMode(const std::string& value) {
if (value != "simple") {
......@@ -951,6 +972,19 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) {
return Status::OK();
}
Status
Config::GetEngineConfigUseGpuThreshold(int32_t& value) {
std::string str =
GetConfigStr(CONFIG_ENGINE, CONFIG_ENGINE_USE_GPU_THRESHOLD, CONFIG_ENGINE_USE_GPU_THRESHOLD_DEFAULT);
Status s = CheckEngineConfigUseGpuThreshold(str);
if (!s.ok()) {
return s;
}
value = std::stoi(str);
return Status::OK();
}
Status
Config::GetResourceConfigMode(std::string& value) {
value = GetConfigStr(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, CONFIG_RESOURCE_MODE_DEFAULT);
......@@ -1203,6 +1237,17 @@ Config::SetEngineConfigOmpThreadNum(const std::string& value) {
return Status::OK();
}
Status
Config::SetEngineConfigUseGpuThreshold(const std::string& value) {
Status s = CheckEngineConfigUseGpuThreshold(value);
if (!s.ok()) {
return s;
}
SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_USE_GPU_THRESHOLD, value);
return Status::OK();
}
/* resource config */
Status
Config::SetResourceConfigMode(const std::string& value) {
......
......@@ -84,6 +84,8 @@ static const char* CONFIG_ENGINE_USE_BLAS_THRESHOLD = "use_blas_threshold";
static const char* CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT = "20";
static const char* CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num";
static const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0";
static const char* CONFIG_ENGINE_USE_GPU_THRESHOLD = "use_gpu_threshold";
static const char* CONFIG_ENGINE_USE_GPU_THRESHOLD_DEFAULT = "1000";
/* resource config */
static const char* CONFIG_RESOURCE = "resource_config";
......@@ -166,6 +168,8 @@ class Config {
CheckEngineConfigUseBlasThreshold(const std::string& value);
Status
CheckEngineConfigOmpThreadNum(const std::string& value);
Status
CheckEngineConfigUseGpuThreshold(const std::string& value);
/* resource config */
Status
......@@ -230,6 +234,8 @@ class Config {
GetEngineConfigUseBlasThreshold(int32_t& value);
Status
GetEngineConfigOmpThreadNum(int32_t& value);
Status
GetEngineConfigUseGpuThreshold(int32_t& value);
/* resource config */
Status
......@@ -289,6 +295,8 @@ class Config {
SetEngineConfigUseBlasThreshold(const std::string& value);
Status
SetEngineConfigOmpThreadNum(const std::string& value);
Status
SetEngineConfigUseGpuThreshold(const std::string& value);
/* resource config */
Status
......
......@@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
TEST_F(TaskTableBaseTest, PUT_BATCH) {
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
empty_table_.Put(tasks);
for (auto& task : tasks) {
empty_table_.Put(task);
}
ASSERT_EQ(empty_table_.at(0)->task, task1_);
ASSERT_EQ(empty_table_.at(1)->task, task2_);
}
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
std::vector<milvus::scheduler::TaskPtr> tasks{};
empty_table_.Put(tasks);
}
TEST_F(TaskTableBaseTest, SIZE) {
ASSERT_EQ(empty_table_.size(), 0);
empty_table_.Put(task1_);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册