From 5387562576de020a35f864a07f14802b68ee398d Mon Sep 17 00:00:00 2001 From: tensor-tang Date: Fri, 1 Jun 2018 14:07:41 +0800 Subject: [PATCH] add multi-thread test --- .../tests/book/test_inference_nlp.cc | 157 ++++++++---------- 1 file changed, 72 insertions(+), 85 deletions(-) diff --git a/paddle/fluid/inference/tests/book/test_inference_nlp.cc b/paddle/fluid/inference/tests/book/test_inference_nlp.cc index 5241661fb31..4e92d6a17b0 100644 --- a/paddle/fluid/inference/tests/book/test_inference_nlp.cc +++ b/paddle/fluid/inference/tests/book/test_inference_nlp.cc @@ -15,11 +15,7 @@ limitations under the License. */ #include #include #include -#include -#include -#include #include // NOLINT -#include #include "gflags/gflags.h" #include "gtest/gtest.h" #include "paddle/fluid/inference/tests/test_helper.h" @@ -41,19 +37,18 @@ inline double get_current_ms() { // return size of total words size_t read_datasets(std::vector* out, const std::string& filename) { - using namespace std; // NOLINT size_t sz = 0; - fstream fin(filename); - string line; + std::fstream fin(filename); + std::string line; out->clear(); while (getline(fin, line)) { - istringstream iss(line); - vector ids; - string field; + std::istringstream iss(line); + std::vector ids; + std::string field; while (getline(iss, field, ' ')) { ids.push_back(stoi(field)); } - if (ids.size() >= 1024 ) { + if (ids.size() >= 1024) { continue; } @@ -69,72 +64,61 @@ size_t read_datasets(std::vector* out, return sz; } -void test_multi_threads() { - /* - size_t jobs_per_thread = std::min(inputdatas.size() / FLAGS_num_threads, - inputdatas.size()); - std::vector workers(FLAGS_num_threads, jobs_per_thread); - workers[FLAGS_num_threads - 1] += inputdatas.size() % FLAGS_num_threads; - - std::vector> infer_threads; - - for (size_t i = 0; i < workers.size(); ++i) { - infer_threads.emplace_back(new std::thread([&, i]() { - size_t start = i * jobs_per_thread; - for (size_t j = start; j < start + workers[i]; ++j ) { - // 0. Call `paddle::framework::InitDevices()` initialize all the - devices - // In unittests, this is done in paddle/testing/paddle_gtest_main.cc - paddle::framework::LoDTensor words; - auto& srcdata = inputdatas[j]; - paddle::framework::LoD lod{{0, srcdata.size()}}; - words.set_lod(lod); - int64_t* pdata = words.mutable_data( - {static_cast(srcdata.size()), 1}, - paddle::platform::CPUPlace()); - memcpy(pdata, srcdata.data(), words.numel() * sizeof(int64_t)); - - LOG(INFO) << "thread id: " << i << ", words size:" << words.numel(); - std::vector cpu_feeds; - cpu_feeds.push_back(&words); - - paddle::framework::LoDTensor output1; - std::vector cpu_fetchs1; - cpu_fetchs1.push_back(&output1); - - // Run inference on CPU - if (FLAGS_prepare_vars) { - if (FLAGS_prepare_context) { - TestInference( - dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, - FLAGS_use_mkldnn); - } else { - TestInference( - dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, - FLAGS_use_mkldnn); - } - } else { - if (FLAGS_prepare_context) { - TestInference( - dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, - FLAGS_use_mkldnn); - } else { - TestInference( - dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, - FLAGS_use_mkldnn); - } - } - //LOG(INFO) << output1.lod(); - //LOG(INFO) << output1.dims(); - } - })); - } - auto start_ms = get_current_ms(); - for (int i = 0; i < FLAGS_num_threads; ++i) { - infer_threads[i]->join(); +void ThreadRunInfer( + const int tid, paddle::framework::Executor* executor, + paddle::framework::Scope* scope, + const std::unique_ptr& inference_program, + const std::vector>& jobs) { + auto copy_program = std::unique_ptr( + new paddle::framework::ProgramDesc(*inference_program)); + std::string feed_holder_name = "feed_" + paddle::string::to_string(tid); + std::string fetch_holder_name = "fetch_" + paddle::string::to_string(tid); + copy_program->SetFeedHolderName(feed_holder_name); + copy_program->SetFetchHolderName(fetch_holder_name); + + // 3. Get the feed_target_names and fetch_target_names + const std::vector& feed_target_names = + copy_program->GetFeedTargetNames(); + const std::vector& fetch_target_names = + copy_program->GetFetchTargetNames(); + + PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL); + std::map fetch_targets; + paddle::framework::LoDTensor outtensor; + fetch_targets[fetch_target_names[0]] = &outtensor; + + std::map feed_targets; + PADDLE_ENFORCE_EQ(feed_target_names.size(), 1UL); + + auto& inputs = jobs[tid]; + auto start_ms = get_current_ms(); + for (size_t i = 0; i < inputs.size(); ++i) { + feed_targets[feed_target_names[0]] = inputs[i]; + executor->Run(*copy_program, scope, &feed_targets, &fetch_targets, true, + true, feed_holder_name, fetch_holder_name); + } + auto stop_ms = get_current_ms(); + LOG(INFO) << "Tid: " << tid << ", process " << inputs.size() + << " samples, avg time per sample: " + + << (stop_ms - start_ms) / inputs.size() << " ms"; +} + +void bcast_datasets( + const std::vector& datasets, + std::vector>* jobs, + const int num_threads) { + size_t s = 0; + jobs->resize(num_threads); + while (s < datasets.size()) { + for (auto it = jobs->begin(); it != jobs->end(); it++) { + it->emplace_back(&datasets[s]); + s++; + if (s >= datasets.size()) { + break; + } } - auto stop_ms = get_current_ms(); - LOG(INFO) << "total: " << stop_ms - start_ms << " ms";*/ + } } TEST(inference, nlp) { @@ -166,7 +150,18 @@ TEST(inference, nlp) { } if (FLAGS_num_threads > 1) { - test_multi_threads(); + std::vector> jobs; + bcast_datasets(datasets, &jobs, FLAGS_num_threads); + std::vector> threads; + for (int i = 0; i < FLAGS_num_threads; ++i) { + threads.emplace_back(new std::thread(ThreadRunInfer, i, &executor, scope, + std::ref(inference_program), + std::ref(jobs))); + } + for (int i = 0; i < FLAGS_num_threads; ++i) { + threads[i]->join(); + } + } else { if (FLAGS_prepare_vars) { executor.CreateVariables(*inference_program, scope, 0); @@ -200,14 +195,6 @@ TEST(inference, nlp) { LOG(INFO) << "Total infer time: " << (stop_ms - start_ms) / 1000.0 / 60 << " min, avg time per seq: " << (stop_ms - start_ms) / datasets.size() << " ms"; -// { // just for test -// auto* scope = new paddle::framework::Scope(); -// paddle::framework::LoDTensor outtensor; -// TestInference( -// dirname, {&(datasets[0])}, {&outtensor}, FLAGS_repeat, model_combined, -// false); -// delete scope; -// } } delete scope; } -- GitLab