提交 7411df34 编写于 作者: Y Yang Yang

add multi thread

上级 8ee17e96
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <thread>
#include <vector> #include <vector>
#include "paddle/framework/executor.h" #include "paddle/framework/executor.h"
...@@ -44,7 +45,7 @@ void SplitTensorAndMoveTensorToScopes( ...@@ -44,7 +45,7 @@ void SplitTensorAndMoveTensorToScopes(
auto lod_tensors = tensor.SplitLoDTensor(places); auto lod_tensors = tensor.SplitLoDTensor(places);
for (auto &lod : lod_tensors) { for (auto &lod : lod_tensors) {
LOG(INFO) << lod.dims(); VLOG(3) << lod.dims();
} }
for (size_t i = 0; i < sub_scopes.size(); ++i) { for (size_t i = 0; i < sub_scopes.size(); ++i) {
...@@ -84,6 +85,7 @@ class ParallelDoOp : public framework::OperatorBase { ...@@ -84,6 +85,7 @@ class ParallelDoOp : public framework::OperatorBase {
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(kInputs)); Inputs(kInputs));
std::vector<std::thread> workers;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx; VLOG(3) << "Run " << place_idx;
...@@ -96,9 +98,14 @@ class ParallelDoOp : public framework::OperatorBase { ...@@ -96,9 +98,14 @@ class ParallelDoOp : public framework::OperatorBase {
} }
// execute // execute
auto executor = framework::Executor(place); workers.push_back(std::thread([program, cur_scope, place, block] {
executor.Run(*program, cur_scope, block->ID(), auto executor = framework::Executor(place);
false /*create_local_scope*/); executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}));
}
for (auto &worker : workers) {
worker.join();
} }
// merge output // merge output
...@@ -162,14 +169,15 @@ class ParallelDoGradOp : public OperatorBase { ...@@ -162,14 +169,15 @@ class ParallelDoGradOp : public OperatorBase {
Inputs(framework::GradVarName(kOutputs))); Inputs(framework::GradVarName(kOutputs)));
for (auto &s : Inputs(framework::GradVarName(kOutputs))) { for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
LOG(INFO) << s; VLOG(3) << s;
LOG(INFO) << scope.FindVar(s)->Get<LoDTensor>(); VLOG(3) << scope.FindVar(s)->Get<LoDTensor>();
for (auto *sub_scope : sub_scopes) { for (auto *sub_scope : sub_scopes) {
LOG(INFO) << sub_scope->FindVar(s)->Get<LoDTensor>(); VLOG(3) << sub_scope->FindVar(s)->Get<LoDTensor>();
} }
} }
// exe run // exe run
std::vector<std::thread> workers;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx; VLOG(3) << "Run " << place_idx;
...@@ -177,25 +185,30 @@ class ParallelDoGradOp : public OperatorBase { ...@@ -177,25 +185,30 @@ class ParallelDoGradOp : public OperatorBase {
auto *cur_scope = sub_scopes[place_idx]; auto *cur_scope = sub_scopes[place_idx];
// execute // execute
auto executor = framework::Executor(place); workers.push_back(std::thread([program, cur_scope, place, block] {
executor.Run(*program, cur_scope, block->ID(), auto executor = framework::Executor(place);
false /*create_local_scope*/); executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}));
}
for (auto &worker : workers) {
worker.join();
} }
// merge grad // merge grad
for (auto &s : Outputs(framework::GradVarName(kParameters))) { for (auto &s : Outputs(framework::GradVarName(kParameters))) {
LOG(INFO) << s; VLOG(3) << s;
auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>(); auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
LOG(INFO) << t; VLOG(3) << t;
std::string s_buf = s + "@BUF"; std::string s_buf = s + "@BUF";
auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable<LoDTensor>(); auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable<LoDTensor>();
for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) { for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) {
auto &tt = sub_scopes[place_idx]->FindVar(s)->Get<LoDTensor>(); auto &tt = sub_scopes[place_idx]->FindVar(s)->Get<LoDTensor>();
LOG(INFO) << place_idx; VLOG(3) << place_idx;
LOG(INFO) << tt; VLOG(3) << tt;
framework::CopyFrom(tt, places[0], t_buf); framework::CopyFrom(tt, places[0], t_buf);
auto sum_op = framework::OpRegistry::CreateOp( auto sum_op = framework::OpRegistry::CreateOp(
...@@ -204,7 +217,7 @@ class ParallelDoGradOp : public OperatorBase { ...@@ -204,7 +217,7 @@ class ParallelDoGradOp : public OperatorBase {
sum_op->Run(*sub_scopes[0], place); sum_op->Run(*sub_scopes[0], place);
} }
LOG(INFO) << t; VLOG(3) << t;
framework::CopyFrom(t, place, scope.FindVar(s)->GetMutable<LoDTensor>()); framework::CopyFrom(t, place, scope.FindVar(s)->GetMutable<LoDTensor>());
} }
} }
...@@ -219,7 +232,7 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { ...@@ -219,7 +232,7 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
auto *grad = new framework::OpDesc(); auto *grad = new framework::OpDesc();
grad->SetType("parallel_do_grad"); grad->SetType("parallel_do_grad");
for (auto &input_param : this->InputNames()) { for (auto &input_param : this->InputNames()) {
LOG(INFO) << input_param; VLOG(3) << input_param;
grad->SetInput(input_param, this->Input(input_param)); grad->SetInput(input_param, this->Input(input_param));
grad->SetOutput(framework::GradVarName(input_param), grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param, false)); this->InputGrad(input_param, false));
......
...@@ -12,7 +12,7 @@ import paddle.v2.fluid.core as core ...@@ -12,7 +12,7 @@ import paddle.v2.fluid.core as core
class ParallelOpTest(unittest.TestCase): class ParallelOpTest(unittest.TestCase):
def setUp(self): def setUp(self):
x = layers.data( x = layers.data(
shape=[-1, 3, 4], shape=[-1, 30, 40],
dtype='float32', dtype='float32',
name='x', name='x',
append_batch_size=False, append_batch_size=False,
...@@ -35,7 +35,7 @@ class ParallelOpTest(unittest.TestCase): ...@@ -35,7 +35,7 @@ class ParallelOpTest(unittest.TestCase):
exe.run(fluid.default_main_program(), exe.run(fluid.default_main_program(),
feed={ feed={
x.name: np.random.uniform(0.1, 0.6, x.name: np.random.uniform(0.1, 0.6,
(2, 3, 4)).astype("float32") (20, 30, 40)).astype("float32")
}) })
def test_forward(self): def test_forward(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册