提交 332398d2 编写于 作者: J Jesse Lee

Check return code from WaitPost::Register

上级 7180bafc
......@@ -94,7 +94,7 @@ Status CelebAOp::LaunchThreadsAndInitOp() {
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(attr_info_queue_->Register(tree_->AllTasks()));
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Walking attr file", std::bind(&CelebAOp::ParseAttrFile, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CelebAOp::WorkerEntry, this, std::placeholders::_1)));
......
......@@ -149,7 +149,7 @@ Status CifarOp::LaunchThreadsAndInitOp() {
RETURN_STATUS_UNEXPECTED("tree_ not set");
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1)));
......
......@@ -168,7 +168,7 @@ Status GeneratorOp::FillBuffer(TensorQTable *tt) {
Status GeneratorOp::operator()() {
// Handshake with TaskManager to synchronize thread creation
TaskManager::FindMe()->Post();
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
std::unique_ptr<DataBuffer> fetched_buffer;
bool eof = false;
while (!eof) {
......
......@@ -386,7 +386,7 @@ Status ImageFolderOp::LaunchThreadsAndInitOp() {
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
// The following code launch 3 threads group
// 1) A thread that walks all folders and push the folder names to a util:Queue mFoldernameQueue.
// 2) Workers that pull foldername from mFoldernameQueue, walk it and return the sorted images to mImagenameQueue
......
......@@ -140,7 +140,7 @@ Status ManifestOp::LaunchThreadsAndInitOp() {
RETURN_STATUS_UNEXPECTED("tree_ not set");
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&ManifestOp::WorkerEntry, this, std::placeholders::_1)));
......
......@@ -644,7 +644,7 @@ Status MindRecordOp::LaunchThreadAndInitOp() {
}
RETURN_IF_NOT_OK(io_blk_queues_.Register(tree_->AllTasks()));
shard_reader_wait_post_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(shard_reader_wait_post_.Register(tree_->AllTasks()));
if (shard_reader_->Launch(!block_reader_) == MSRStatus::FAILED) {
RETURN_STATUS_UNEXPECTED("MindRecordOp launch failed.");
}
......
......@@ -395,7 +395,7 @@ Status MnistOp::LaunchThreadsAndInitOp() {
RETURN_STATUS_UNEXPECTED("tree_ not set");
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&MnistOp::WorkerEntry, this, std::placeholders::_1)));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(this->WalkAllFiles());
......
......@@ -370,7 +370,7 @@ Status TextFileOp::operator()() {
// must be called after launching workers.
TaskManager::FindMe()->Post();
io_block_queue_wait_post_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
NotifyToFillIOBlockQueue();
while (!finished_reading_dataset_) {
int64_t buffer_id = 0;
......
......@@ -222,7 +222,7 @@ Status TFReaderOp::operator()() {
// so workers have to be kept alive until the end of the program
TaskManager::FindMe()->Post();
io_block_queue_wait_post_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
NotifyToFillIOBlockQueue();
while (!finished_reading_dataset_) {
......
......@@ -231,7 +231,7 @@ Status VOCOp::LaunchThreadsAndInitOp() {
RETURN_STATUS_UNEXPECTED("tree_ not set");
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
wp_.Register(tree_->AllTasks());
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&VOCOp::WorkerEntry, this, std::placeholders::_1)));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(this->ParseImageIds());
......
......@@ -3,7 +3,6 @@ add_library(utils OBJECT
circular_pool.cc
memory_pool.cc
cond_var.cc
semaphore.cc
intrp_service.cc
task.cc
task_manager.cc
......
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/semaphore.h"
#include "dataset/util/task_manager.h"
namespace mindspore {
namespace dataset {
Status Semaphore::P() {
std::unique_lock<std::mutex> lck(mutex_);
return (wait_cond_.Wait(&lck, [this]() { return value_ != 0; }));
}
void Semaphore::V() {
std::unique_lock<std::mutex> lck(mutex_);
++value_;
wait_cond_.NotifyOne();
}
void Semaphore::Register(TaskGroup *vg) { (void)wait_cond_.Register(vg->GetIntrpService()); }
Status Semaphore::Deregister() { return (wait_cond_.Deregister()); }
void Semaphore::ResetIntrpState() { wait_cond_.ResetIntrpState(); }
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_SEMAPHORE_H_
#define DATASET_UTIL_SEMAPHORE_H_
#include "dataset/util/cond_var.h"
namespace mindspore {
namespace dataset {
class TaskGroup;
class Semaphore {
public:
explicit Semaphore(int init) : value_(init) {}
virtual ~Semaphore() {}
Status P();
void V();
void Register(TaskGroup *vg);
Status Deregister();
void ResetIntrpState();
private:
int value_;
std::mutex mutex_;
CondVar wait_cond_;
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_UTIL_SEMAPHORE_H_
......@@ -53,7 +53,7 @@ Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::funct
LockGuard lck(&tg_lock_);
this->grp_list_.insert(vg);
}
(*task)->wp_.Register(vg);
RETURN_IF_NOT_OK((*task)->wp_.Register(vg));
RETURN_IF_NOT_OK((*task)->Run());
// Wait for the thread to initialize successfully.
RETURN_IF_NOT_OK((*task)->Wait());
......
......@@ -36,7 +36,7 @@ void WaitPost::Clear() {
value_ = 0;
}
void WaitPost::Register(TaskGroup *vg) { (void)wait_cond_.Register(vg->GetIntrpService()); }
Status WaitPost::Register(TaskGroup *vg) { return wait_cond_.Register(vg->GetIntrpService()); }
void WaitPost::ResetIntrpState() { wait_cond_.ResetIntrpState(); }
......
......@@ -36,7 +36,7 @@ class WaitPost {
void Clear();
void Register(TaskGroup *vg);
Status Register(TaskGroup *vg);
Status Deregister();
......
......@@ -20,7 +20,6 @@
#include "dataset/util/intrp_service.h"
#include "dataset/util/task_manager.h"
#include "dataset/util/queue.h"
#include "dataset/util/semaphore.h"
using namespace mindspore::dataset;
using mindspore::MsLogLevel::INFO;
......@@ -55,11 +54,12 @@ TEST_F(MindDataTestIntrpService, Test1) {
TEST_F(MindDataTestIntrpService, Test2) {
MS_LOG(INFO) << "Test Semaphore";
Status rc;
Semaphore sem(0);
sem.Register(&vg_);
WaitPost wp;
rc = wp.Register(&vg_);
EXPECT_TRUE(rc.IsOk());
vg_.CreateAsyncTask("Test1", [&]() -> Status {
TaskManager::FindMe()->Post();
Status rc = sem.P();
Status rc = wp.Wait();
EXPECT_TRUE(rc.IsInterrupted());
return rc;
});
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册