提交 923406da 编写于 作者: Y Yu Kun

MS-639 SQ8H index created failed and server hang


Former-commit-id: f57b0eb2423ee7d99fb58f06a68e828b3273d0f9
上级 f30822e3
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "BuildMgr.h"
namespace milvus {
namespace scheduler {
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace scheduler {
class BuildMgr {
public:
explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) {
}
public:
void
Put() {
++numoftasks_;
}
void
take() {
--numoftasks_;
}
int64_t
numoftasks() {
return (int64_t)numoftasks_;
}
private:
std::atomic_long numoftasks_;
};
using BuildMgrPtr = std::shared_ptr<BuildMgr>;
} // namespace scheduler
} // namespace milvus
...@@ -41,6 +41,9 @@ std::mutex JobMgrInst::mutex_; ...@@ -41,6 +41,9 @@ std::mutex JobMgrInst::mutex_;
OptimizerPtr OptimizerInst::instance = nullptr; OptimizerPtr OptimizerInst::instance = nullptr;
std::mutex OptimizerInst::mutex_; std::mutex OptimizerInst::mutex_;
BuildMgrPtr BuildMgrInst::instance = nullptr;
std::mutex BuildMgrInst::mutex_;
void void
load_simple_config() { load_simple_config() {
server::Config& config = server::Config::GetInstance(); server::Config& config = server::Config::GetInstance();
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#pragma once #pragma once
#include "BuildMgr.h"
#include "JobMgr.h" #include "JobMgr.h"
#include "ResourceMgr.h" #include "ResourceMgr.h"
#include "Scheduler.h" #include "Scheduler.h"
...@@ -105,6 +106,24 @@ class OptimizerInst { ...@@ -105,6 +106,24 @@ class OptimizerInst {
static std::mutex mutex_; static std::mutex mutex_;
}; };
class BuildMgrInst {
public:
static BuildMgrPtr
GetInstance() {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
instance = std::make_shared<BuildMgr>(4);
}
}
return instance;
}
private:
static BuildMgrPtr instance;
static std::mutex mutex_;
};
void void
StartSchedulerService(); StartSchedulerService();
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
// under the License. // under the License.
#include "scheduler/TaskTable.h" #include "scheduler/TaskTable.h"
#include "scheduler/SchedInst.h"
#include "Utils.h" #include "Utils.h"
#include "event/TaskTableUpdatedEvent.h" #include "event/TaskTableUpdatedEvent.h"
#include "utils/Log.h" #include "utils/Log.h"
...@@ -164,6 +165,15 @@ TaskTable::PickToLoad(uint64_t limit) { ...@@ -164,6 +165,15 @@ TaskTable::PickToLoad(uint64_t limit) {
if (not table_[j]) { if (not table_[j]) {
SERVER_LOG_WARNING << "table[" << j << "] is nullptr"; SERVER_LOG_WARNING << "table[" << j << "] is nullptr";
} }
if (table_[j]->task->path().Current() == "cpu") {
if (table_[j]->task->Type() == TaskType::BuildIndexTask
&& BuildMgrInst::GetInstance()->numoftasks() < 1) {
return std::vector<uint64_t>();
}
}
if (table_[j]->state == TaskTableItemState::LOADED) { if (table_[j]->state == TaskTableItemState::LOADED) {
++count; ++count;
if (count > 2) if (count > 2)
...@@ -177,9 +187,21 @@ TaskTable::PickToLoad(uint64_t limit) { ...@@ -177,9 +187,21 @@ TaskTable::PickToLoad(uint64_t limit) {
if (not cross && table_[i]->IsFinish()) { if (not cross && table_[i]->IsFinish()) {
last_finish_ = i; last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::START) { } else if (table_[i]->state == TaskTableItemState::START) {
cross = true; auto task = table_[i]->task;
indexes.push_back(i); if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
++count; if (BuildMgrInst::GetInstance()->numoftasks() == 0) {
break;
} else {
cross = true;
indexes.push_back(i);
++count;
BuildMgrInst::GetInstance()->take();
}
} else {
cross = true;
indexes.push_back(i);
++count;
}
} }
} }
return indexes; return indexes;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
// under the License. // under the License.
#include "scheduler/resource/Resource.h" #include "scheduler/resource/Resource.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h" #include "scheduler/Utils.h"
#include <iostream> #include <iostream>
...@@ -111,11 +112,18 @@ Resource::pick_task_load() { ...@@ -111,11 +112,18 @@ Resource::pick_task_load() {
TaskTableItemPtr TaskTableItemPtr
Resource::pick_task_execute() { Resource::pick_task_execute() {
auto indexes = task_table_.PickToExecute(3); // auto indexes = task_table_.PickToExecute(3);
auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
for (auto index : indexes) { for (auto index : indexes) {
// try to set one task executing, then return // try to set one task executing, then return
if (task_table_.Execute(index)) // if (task_table_.Execute(index))
return task_table_.Get(index); // return task_table_.Get(index);
if (task_table_.Get(index)->task->path().Current()
== task_table_.Get(index)->task->path().Last()) {
if (task_table_.Execute(index)) {
return task_table_.Get(index);
}
}
// else try next // else try next
} }
return nullptr; return nullptr;
...@@ -167,6 +175,12 @@ Resource::executor_function() { ...@@ -167,6 +175,12 @@ Resource::executor_function() {
total_cost_ += finish - start; total_cost_ += finish - start;
task_item->Executed(); task_item->Executed();
if (task_item->task->Type() == TaskType::BuildIndexTask) {
BuildMgrInst::GetInstance()->Put();
ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader();
}
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item); auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
......
...@@ -40,13 +40,22 @@ class Path { ...@@ -40,13 +40,22 @@ class Path {
return path_; return path_;
} }
std::string
Current() {
if (!path_.empty() && path_.size() > index_) {
return path_[index_];
} else {
return "";
}
}
std::string std::string
Next() { Next() {
if (index_ > 0 && !path_.empty()) { if (index_ > 0 && !path_.empty()) {
--index_; --index_;
return path_[index_]; return path_[index_];
} else { } else {
return nullptr; return "";
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册