提交 53875625 编写于 作者: T tensor-tang

add multi-thread test

上级 733718c3
...@@ -15,11 +15,7 @@ limitations under the License. */ ...@@ -15,11 +15,7 @@ limitations under the License. */
#include <sys/time.h> #include <sys/time.h>
#include <time.h> #include <time.h>
#include <fstream> #include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <thread> // NOLINT #include <thread> // NOLINT
#include <vector>
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/inference/tests/test_helper.h" #include "paddle/fluid/inference/tests/test_helper.h"
...@@ -41,19 +37,18 @@ inline double get_current_ms() { ...@@ -41,19 +37,18 @@ inline double get_current_ms() {
// return size of total words // return size of total words
size_t read_datasets(std::vector<paddle::framework::LoDTensor>* out, size_t read_datasets(std::vector<paddle::framework::LoDTensor>* out,
const std::string& filename) { const std::string& filename) {
using namespace std; // NOLINT
size_t sz = 0; size_t sz = 0;
fstream fin(filename); std::fstream fin(filename);
string line; std::string line;
out->clear(); out->clear();
while (getline(fin, line)) { while (getline(fin, line)) {
istringstream iss(line); std::istringstream iss(line);
vector<int64_t> ids; std::vector<int64_t> ids;
string field; std::string field;
while (getline(iss, field, ' ')) { while (getline(iss, field, ' ')) {
ids.push_back(stoi(field)); ids.push_back(stoi(field));
} }
if (ids.size() >= 1024 ) { if (ids.size() >= 1024) {
continue; continue;
} }
...@@ -69,72 +64,61 @@ size_t read_datasets(std::vector<paddle::framework::LoDTensor>* out, ...@@ -69,72 +64,61 @@ size_t read_datasets(std::vector<paddle::framework::LoDTensor>* out,
return sz; return sz;
} }
void test_multi_threads() { void ThreadRunInfer(
/* const int tid, paddle::framework::Executor* executor,
size_t jobs_per_thread = std::min(inputdatas.size() / FLAGS_num_threads, paddle::framework::Scope* scope,
inputdatas.size()); const std::unique_ptr<paddle::framework::ProgramDesc>& inference_program,
std::vector<size_t> workers(FLAGS_num_threads, jobs_per_thread); const std::vector<std::vector<const paddle::framework::LoDTensor*>>& jobs) {
workers[FLAGS_num_threads - 1] += inputdatas.size() % FLAGS_num_threads; auto copy_program = std::unique_ptr<paddle::framework::ProgramDesc>(
new paddle::framework::ProgramDesc(*inference_program));
std::vector<std::unique_ptr<std::thread>> infer_threads; std::string feed_holder_name = "feed_" + paddle::string::to_string(tid);
std::string fetch_holder_name = "fetch_" + paddle::string::to_string(tid);
for (size_t i = 0; i < workers.size(); ++i) { copy_program->SetFeedHolderName(feed_holder_name);
infer_threads.emplace_back(new std::thread([&, i]() { copy_program->SetFetchHolderName(fetch_holder_name);
size_t start = i * jobs_per_thread;
for (size_t j = start; j < start + workers[i]; ++j ) { // 3. Get the feed_target_names and fetch_target_names
// 0. Call `paddle::framework::InitDevices()` initialize all the const std::vector<std::string>& feed_target_names =
devices copy_program->GetFeedTargetNames();
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc const std::vector<std::string>& fetch_target_names =
paddle::framework::LoDTensor words; copy_program->GetFetchTargetNames();
auto& srcdata = inputdatas[j];
paddle::framework::LoD lod{{0, srcdata.size()}}; PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL);
words.set_lod(lod); std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
int64_t* pdata = words.mutable_data<int64_t>( paddle::framework::LoDTensor outtensor;
{static_cast<int64_t>(srcdata.size()), 1}, fetch_targets[fetch_target_names[0]] = &outtensor;
paddle::platform::CPUPlace());
memcpy(pdata, srcdata.data(), words.numel() * sizeof(int64_t)); std::map<std::string, const paddle::framework::LoDTensor*> feed_targets;
PADDLE_ENFORCE_EQ(feed_target_names.size(), 1UL);
LOG(INFO) << "thread id: " << i << ", words size:" << words.numel();
std::vector<paddle::framework::LoDTensor*> cpu_feeds; auto& inputs = jobs[tid];
cpu_feeds.push_back(&words); auto start_ms = get_current_ms();
for (size_t i = 0; i < inputs.size(); ++i) {
paddle::framework::LoDTensor output1; feed_targets[feed_target_names[0]] = inputs[i];
std::vector<paddle::framework::LoDTensor*> cpu_fetchs1; executor->Run(*copy_program, scope, &feed_targets, &fetch_targets, true,
cpu_fetchs1.push_back(&output1); true, feed_holder_name, fetch_holder_name);
}
// Run inference on CPU auto stop_ms = get_current_ms();
if (FLAGS_prepare_vars) { LOG(INFO) << "Tid: " << tid << ", process " << inputs.size()
if (FLAGS_prepare_context) { << " samples, avg time per sample: "
TestInference<paddle::platform::CPUPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, << (stop_ms - start_ms) / inputs.size() << " ms";
FLAGS_use_mkldnn); }
} else {
TestInference<paddle::platform::CPUPlace, false, false>( void bcast_datasets(
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, const std::vector<paddle::framework::LoDTensor>& datasets,
FLAGS_use_mkldnn); std::vector<std::vector<const paddle::framework::LoDTensor*>>* jobs,
} const int num_threads) {
} else { size_t s = 0;
if (FLAGS_prepare_context) { jobs->resize(num_threads);
TestInference<paddle::platform::CPUPlace, true, true>( while (s < datasets.size()) {
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, for (auto it = jobs->begin(); it != jobs->end(); it++) {
FLAGS_use_mkldnn); it->emplace_back(&datasets[s]);
} else { s++;
TestInference<paddle::platform::CPUPlace, true, false>( if (s >= datasets.size()) {
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, model_combined, break;
FLAGS_use_mkldnn); }
}
}
//LOG(INFO) << output1.lod();
//LOG(INFO) << output1.dims();
}
}));
}
auto start_ms = get_current_ms();
for (int i = 0; i < FLAGS_num_threads; ++i) {
infer_threads[i]->join();
} }
auto stop_ms = get_current_ms(); }
LOG(INFO) << "total: " << stop_ms - start_ms << " ms";*/
} }
TEST(inference, nlp) { TEST(inference, nlp) {
...@@ -166,7 +150,18 @@ TEST(inference, nlp) { ...@@ -166,7 +150,18 @@ TEST(inference, nlp) {
} }
if (FLAGS_num_threads > 1) { if (FLAGS_num_threads > 1) {
test_multi_threads(); std::vector<std::vector<const paddle::framework::LoDTensor*>> jobs;
bcast_datasets(datasets, &jobs, FLAGS_num_threads);
std::vector<std::unique_ptr<std::thread>> threads;
for (int i = 0; i < FLAGS_num_threads; ++i) {
threads.emplace_back(new std::thread(ThreadRunInfer, i, &executor, scope,
std::ref(inference_program),
std::ref(jobs)));
}
for (int i = 0; i < FLAGS_num_threads; ++i) {
threads[i]->join();
}
} else { } else {
if (FLAGS_prepare_vars) { if (FLAGS_prepare_vars) {
executor.CreateVariables(*inference_program, scope, 0); executor.CreateVariables(*inference_program, scope, 0);
...@@ -200,14 +195,6 @@ TEST(inference, nlp) { ...@@ -200,14 +195,6 @@ TEST(inference, nlp) {
LOG(INFO) << "Total infer time: " << (stop_ms - start_ms) / 1000.0 / 60 LOG(INFO) << "Total infer time: " << (stop_ms - start_ms) / 1000.0 / 60
<< " min, avg time per seq: " << " min, avg time per seq: "
<< (stop_ms - start_ms) / datasets.size() << " ms"; << (stop_ms - start_ms) / datasets.size() << " ms";
// { // just for test
// auto* scope = new paddle::framework::Scope();
// paddle::framework::LoDTensor outtensor;
// TestInference<paddle::platform::CPUPlace, false, true>(
// dirname, {&(datasets[0])}, {&outtensor}, FLAGS_repeat, model_combined,
// false);
// delete scope;
// }
} }
delete scope; delete scope;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册