diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc index b055bb48f608c9fd9cc671d175cb463d25dc489b..e568174957c8237344f5365c4cca46b9e43da0ca 100644 --- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc @@ -36,7 +36,7 @@ void NCCLAllReduceOpHandle::RunImpl() { // Wait input done for (auto *in : inputs_) { auto &p = static_cast(in)->place_; - in->generated_op_->Wait(dev_ctxes_[p]); + if (in->generated_op_) in->generated_op_->Wait(dev_ctxes_[p]); } auto &var_name = static_cast(this->inputs_[0])->name_; diff --git a/paddle/fluid/framework/details/send_op_handle.cc b/paddle/fluid/framework/details/send_op_handle.cc index 0763f92171e7813ec0ee8ca4f3aa42b76205130a..797d795d3d729b89ca427387492073587109427f 100644 --- a/paddle/fluid/framework/details/send_op_handle.cc +++ b/paddle/fluid/framework/details/send_op_handle.cc @@ -32,7 +32,7 @@ void SendOpHandle::RunImpl() { if (in->DebugString() == "dummy") { // HACK continue; } - in->generated_op_->Wait(dev_ctxes_[p]); + if (in->generated_op_) in->generated_op_->Wait(dev_ctxes_[p]); } auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get(); // FIXME(wuyi): can not use RunAndRecordEvent here, for it will cause dead diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 5fbe35e205deffe8e6f24e2a4dcf326c04323618..e54dccbbe5e02fe6fb008b71242f8cf20892ca90 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import numpy +import numpy as np import unittest import paddle.fluid as fluid @@ -243,7 +243,7 @@ class TestParallelExecutorBase(unittest.TestCase): begin = time.time() first_loss, = run_executor( exe=exe, feed=feed_dict, fetch_list=[loss.name]) - first_loss = numpy.array(first_loss) + first_loss = np.array(first_loss) for i in xrange(iter): run_executor(exe=exe, feed=feed_dict, fetch_list=[]) @@ -256,7 +256,7 @@ class TestParallelExecutorBase(unittest.TestCase): print "%.4f Instance per second" % ( (batch_size * iter + 2) / (end - begin)) - last_loss = numpy.array(last_loss) + last_loss = np.array(last_loss) print first_loss, last_loss # self.assertGreater(first_loss[0], last_loss[0]) @@ -284,8 +284,8 @@ class TestMNIST(TestParallelExecutorBase): self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net, allow_op_delay=True) - img = numpy.zeros(shape=[32, 784], dtype='float32') - label = numpy.ones(shape=[32, 1], dtype='int64') + img = np.zeros(shape=[32, 784], dtype='float32') + label = np.ones(shape=[32, 1], dtype='int64') self.check_network_convergence( simple_fc_net, feed_dict={"image": img, "label": label}) @@ -294,8 +294,8 @@ class TestMNIST(TestParallelExecutorBase): self.check_simple_fc_convergence() def check_simple_fc_parallel_accuracy(self): - img = numpy.zeros(shape=[32, 784], dtype='float32') - label = numpy.ones(shape=[32, 1], dtype='int64') + img = np.zeros(shape=[32, 784], dtype='float32') + label = np.ones(shape=[32, 1], dtype='int64') single_first_loss, single_last_loss = self.check_network_convergence( method=simple_fc_net, seed=1000, @@ -319,8 +319,8 @@ class TestMNIST(TestParallelExecutorBase): def check_batchnorm_fc_convergence(self): self.check_network_convergence(fc_with_batchnorm) - img = numpy.zeros(shape=[32, 784], dtype='float32') - label = numpy.ones(shape=[32, 1], dtype='int64') + img = np.zeros(shape=[32, 784], dtype='float32') + label = np.ones(shape=[32, 1], dtype='int64') self.check_network_convergence( fc_with_batchnorm, feed_dict={"image": img, "label": label}) @@ -404,9 +404,6 @@ class ModelHyperParams(object): dropout = 0.1 -import numpy as np - - def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): """ Pad the instances to the max sequence length in batch, and generate the @@ -533,9 +530,8 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): opt.minimize(loss) batch_size = 32 - image = numpy.random.normal(size=(batch_size, - 784)).astype('float32') - label = numpy.random.randint(0, 10, (batch_size, 1), dtype="int64") + image = np.random.normal(size=(batch_size, 784)).astype('float32') + label = np.random.randint(0, 10, (batch_size, 1), dtype="int64") place = fluid.CUDAPlace(0) exe = fluid.Executor(place) @@ -552,12 +548,12 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): for i in xrange(5): test_loss, = test_exe.run([loss.name], feed=feed_dict) - test_loss = numpy.array(test_loss) + test_loss = np.array(test_loss) train_loss, = train_exe.run([loss.name], feed=feed_dict) - train_loss = numpy.array(train_loss) + train_loss = np.array(train_loss) self.assertTrue( - numpy.allclose( + np.allclose( train_loss, test_loss, atol=1e-8), "Train loss: " + str(train_loss) + "\n Test loss:" + str(test_loss)) @@ -712,7 +708,7 @@ class TestCRFModel(unittest.TestCase): data = train_data() for i in xrange(10): cur_batch = next(data) - print map(numpy.array, + print map(np.array, pe.run(feed=feeder.feed(cur_batch), fetch_list=[avg_cost.name]))[0] @@ -723,7 +719,7 @@ class TestCRFModel(unittest.TestCase): self.check_network_convergence(is_sparse=False) -# test fetch op +# test fetch all the variables of global_block import paddle.dataset.flowers as flowers @@ -763,7 +759,8 @@ class TestFetchOp(unittest.TestCase): opt.minimize(loss) # TODO(zcd): I found that onece the memory optimizer is open, - # parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD, conv2d_1.b_0@GRAD. + # parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD, + # conv2d_1.b_0@GRAD. Those variables should not be pruned. # fluid.memory_optimize(main) place = fluid.CUDAPlace(0) @@ -775,16 +772,15 @@ class TestFetchOp(unittest.TestCase): use_cuda=True, loss_name=loss.name, main_program=main) fetch_list = [] - for data in train_inputs: - all_vars = main.global_block().vars - for k, v in all_vars.iteritems(): - if v.persistable and 'velocity' not in k: - fetch_list.append(k) + all_vars = main.global_block().vars + for k, v in all_vars.iteritems(): + if 'velocity' not in k: + fetch_list.append(k) + for data in train_inputs: ret = pe.run(fetch_list, feed=feeder.feed(data)) - result = {} for i in range(len(fetch_list)): - result[fetch_list[i]] = np.sum(ret[i]) + print("%s - %s" % (fetch_list[i], np.sum(ret[i]))) def test_update_sparse_parameter(self): tst_reader = paddle.batch(flowers.test(use_xmap=False), batch_size=16)