diff --git a/paddle/operators/parallel_do_op.cc b/paddle/operators/parallel_do_op.cc index a81ddb25c4ee9892a904696088cb93517536494c..c32884f8c283a906ef54288ff8295557f72b654b 100644 --- a/paddle/operators/parallel_do_op.cc +++ b/paddle/operators/parallel_do_op.cc @@ -12,6 +12,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +#include #include #include "paddle/framework/executor.h" @@ -44,7 +45,7 @@ void SplitTensorAndMoveTensorToScopes( auto lod_tensors = tensor.SplitLoDTensor(places); for (auto &lod : lod_tensors) { - LOG(INFO) << lod.dims(); + VLOG(3) << lod.dims(); } for (size_t i = 0; i < sub_scopes.size(); ++i) { @@ -84,6 +85,7 @@ class ParallelDoOp : public framework::OperatorBase { SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, Inputs(kInputs)); + std::vector workers; for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { VLOG(3) << "Run " << place_idx; @@ -96,9 +98,14 @@ class ParallelDoOp : public framework::OperatorBase { } // execute - auto executor = framework::Executor(place); - executor.Run(*program, cur_scope, block->ID(), - false /*create_local_scope*/); + workers.push_back(std::thread([program, cur_scope, place, block] { + auto executor = framework::Executor(place); + executor.Run(*program, cur_scope, block->ID(), + false /*create_local_scope*/); + })); + } + for (auto &worker : workers) { + worker.join(); } // merge output @@ -162,14 +169,15 @@ class ParallelDoGradOp : public OperatorBase { Inputs(framework::GradVarName(kOutputs))); for (auto &s : Inputs(framework::GradVarName(kOutputs))) { - LOG(INFO) << s; - LOG(INFO) << scope.FindVar(s)->Get(); + VLOG(3) << s; + VLOG(3) << scope.FindVar(s)->Get(); for (auto *sub_scope : sub_scopes) { - LOG(INFO) << sub_scope->FindVar(s)->Get(); + VLOG(3) << sub_scope->FindVar(s)->Get(); } } // exe run + std::vector workers; for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { VLOG(3) << "Run " << place_idx; @@ -177,25 +185,30 @@ class ParallelDoGradOp : public OperatorBase { auto *cur_scope = sub_scopes[place_idx]; // execute - auto executor = framework::Executor(place); - executor.Run(*program, cur_scope, block->ID(), - false /*create_local_scope*/); + workers.push_back(std::thread([program, cur_scope, place, block] { + auto executor = framework::Executor(place); + executor.Run(*program, cur_scope, block->ID(), + false /*create_local_scope*/); + })); + } + for (auto &worker : workers) { + worker.join(); } // merge grad for (auto &s : Outputs(framework::GradVarName(kParameters))) { - LOG(INFO) << s; + VLOG(3) << s; auto &t = sub_scopes[0]->FindVar(s)->Get(); - LOG(INFO) << t; + VLOG(3) << t; std::string s_buf = s + "@BUF"; auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable(); for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) { auto &tt = sub_scopes[place_idx]->FindVar(s)->Get(); - LOG(INFO) << place_idx; - LOG(INFO) << tt; + VLOG(3) << place_idx; + VLOG(3) << tt; framework::CopyFrom(tt, places[0], t_buf); auto sum_op = framework::OpRegistry::CreateOp( @@ -204,7 +217,7 @@ class ParallelDoGradOp : public OperatorBase { sum_op->Run(*sub_scopes[0], place); } - LOG(INFO) << t; + VLOG(3) << t; framework::CopyFrom(t, place, scope.FindVar(s)->GetMutable()); } } @@ -219,7 +232,7 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { auto *grad = new framework::OpDesc(); grad->SetType("parallel_do_grad"); for (auto &input_param : this->InputNames()) { - LOG(INFO) << input_param; + VLOG(3) << input_param; grad->SetInput(input_param, this->Input(input_param)); grad->SetOutput(framework::GradVarName(input_param), this->InputGrad(input_param, false)); diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index c39040869d1d8d5f09989d82827738976e9fb247..2788f4e519b31b45250fbb923b2309e8bb1f6fa1 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -12,7 +12,7 @@ import paddle.v2.fluid.core as core class ParallelOpTest(unittest.TestCase): def setUp(self): x = layers.data( - shape=[-1, 3, 4], + shape=[-1, 30, 40], dtype='float32', name='x', append_batch_size=False, @@ -35,7 +35,7 @@ class ParallelOpTest(unittest.TestCase): exe.run(fluid.default_main_program(), feed={ x.name: np.random.uniform(0.1, 0.6, - (2, 3, 4)).astype("float32") + (20, 30, 40)).astype("float32") }) def test_forward(self):