diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index cee97820d6a0337432aed8a7987290ce7a4bc0e9..c99200ec98aa8f0736610f659d3b94e3c2f1e023 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -239,6 +239,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get(); break; + case BuildStrategy::ReduceStrategy::kNoReduce: + multi_devices_pass = AppendPass("no_reduce_multi_devices_pass").get(); + break; default: PADDLE_THROW( platform::errors::Unimplemented("Unknown reduce strategy.")); @@ -475,6 +478,7 @@ USE_PASS(fuse_bn_act_pass); USE_PASS(fuse_bn_add_act_pass); USE_PASS(graph_viz_pass); USE_PASS(multi_batch_merge_pass); +USE_PASS(no_reduce_multi_devices_pass); USE_PASS(reduce_mode_multi_devices_pass); USE_PASS(all_reduce_mode_multi_devices_pass); USE_PASS(dist_multi_devices_pass); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index f9c28cbee50c3e4d7ba75d9e8e5fc4dc785f2157..70a083dd70bc3b48bf24b050673f3da7b69b1755 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -72,7 +72,7 @@ struct BuildStrategy { // For CPU, if you want to fix the order of summing to make the result // of kAllReduce and kReduce no diff, you can add // `FLAGS_cpu_deterministic=true` to env. - enum class ReduceStrategy { kAllReduce = 0, kReduce = 1 }; + enum class ReduceStrategy { kAllReduce = 0, kReduce = 1, kNoReduce = 2 }; enum class GradientScaleStrategy { kCoeffNumDevice = 0, diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 28108e78d9d9994b87c603112882f91d41f73cef..da2147f40363e898987f22e4edc643c49ca7f1da 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -117,6 +117,7 @@ message BuildStrategy { optional bool enable_addto = 12 [ default = false ]; optional bool fix_op_run_order = 13 [ default = false ]; optional bool allow_cuda_graph_capture = 14 [ default = false ]; + optional int32 reduce_strategy = 15 [ default = 0 ]; } message ExecutionStrategy { diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc index c50e00f9995101ede3b357c012aa57865b597a3e..5dbc3e38ea135aed171a0b77c5a29b68e1b3193c 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc @@ -1283,3 +1283,5 @@ REGISTER_MULTI_DEVICES_PASS(dist_multi_devices_pass, paddle::framework::ir::DistSSAGraphBuilder); REGISTER_MULTI_DEVICES_PASS(async_multi_devices_pass, paddle::framework::ir::AsyncSSAGraphBuilder); +REGISTER_MULTI_DEVICES_PASS(no_reduce_multi_devices_pass, + paddle::framework::ir::NoReduceSSAGraphBuilder); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h index 27eda22828e03ee1d15affcd69fed217b1700240..c76f30016763a3bdd6c0fb7146bffe68e30bca3d 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h @@ -144,6 +144,15 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { bool IsEncoded(const std::string &p_name) const; }; +class NoReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { + protected: + void InsertCollectiveOp(ir::Graph *result, ir::Node *node, + const std::string &p_name, + const std::string &g_name) const override {} + + void InsertPostprocessOps(ir::Graph *result) const override {} +}; + class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { protected: void InsertCollectiveOp(ir::Graph *result, ir::Node *node, diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 5f4e9a88613905156ccdd2a8070b386bb8b0944f..3eabf255ccbacb8a17d7504b9279bf1bb6e68294 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -2984,7 +2984,8 @@ All parameter, weight, gradient are variables in Paddle. py::enum_(build_strategy, "ReduceStrategy") .value("Reduce", BuildStrategy::ReduceStrategy::kReduce) - .value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce); + .value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce) + .value("_NoReduce", BuildStrategy::ReduceStrategy::kNoReduce); py::enum_(build_strategy, "GradientScaleStrategy") .value("CoeffNumDevice", diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 3b8b36a61e2fb73de67d40ea981d9eb1f4455b00..8c9499628e7678bf0ae4f715dcad344cef88ab96 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -102,6 +102,10 @@ class DistributedJobInfo(object): self.job_info.strategy = dist_strategy +ReduceStrategyFluid = paddle.fluid.BuildStrategy.ReduceStrategy +ReduceStrategyFleet = int + + class DistributedStrategy(object): __lock_attr = False @@ -239,8 +243,10 @@ class DistributedStrategy(object): build_strategy = paddle.fluid.BuildStrategy() fields = self.strategy.build_strategy.DESCRIPTOR.fields for f in fields: - setattr(build_strategy, f.name, - getattr(self.strategy.build_strategy, f.name)) + value = getattr(self.strategy.build_strategy, f.name) + if f.name == 'reduce_strategy': + value = ReduceStrategyFluid(value) + setattr(build_strategy, f.name, value) return build_strategy @build_strategy.setter @@ -249,8 +255,10 @@ class DistributedStrategy(object): fields = self.strategy.build_strategy.DESCRIPTOR.fields for f in fields: if f.label == 1 or f.label == 2: # optional and required field - setattr(self.strategy.build_strategy, f.name, - getattr(strategy, f.name)) + value = getattr(strategy, f.name) + if f.name == 'reduce_strategy': + value = ReduceStrategyFleet(value) + setattr(self.strategy.build_strategy, f.name, value) elif f.label == 3: # repeated field getattr(self.strategy.build_strategy, f.name).extend(getattr(strategy, f.name)) diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index 2698f1a00dc8034dbf2e6750131d103abc45df68..1fa86d0aeeabd65932b545b77f236b49062ec872 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -85,6 +85,16 @@ def _has_optimizer_in_control_flow(program): return False +def _should_broadcast_or_not_exists(program, var_name): + block = program.global_block() + var = block.vars.get(var_name, None) + if var is None: + return True + is_distributed = getattr(var, '_is_distributed', False) or getattr( + var, 'is_distributed', False) + return not is_distributed + + class CompiledProgram(object): """ :api_attr: Static Graph @@ -398,7 +408,10 @@ class CompiledProgram(object): for node in self._graph.nodes(): if node.is_var() and node.var() is not None and node.var().persistable() and \ node.var().type() != core.VarDesc.VarType.RAW: - self._persistable_vars.append(cpt.to_text(node.name())) + name = cpt.to_text(node.name()) + if self._program is not None and _should_broadcast_or_not_exists( + self._program, name): + self._persistable_vars.append(cpt.to_text(node.name())) places = list(map(_place_obj, places)) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 2c79670f1a27cda72475e474fa992a1a5da987e3..61d643f24c17a9945239119e621732b779729b81 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -65,17 +65,18 @@ def fc_with_batchnorm(use_feed): return loss +def init_data(): + np.random.seed(5) + img = np.random.random(size=[32, 784]).astype(np.float32) + label = np.ones(shape=[32, 1], dtype='int64') + return img, label + + class TestMNIST(TestParallelExecutorBase): @classmethod def setUpClass(cls): os.environ['CPU_NUM'] = str(4) - def _init_data(self): - np.random.seed(5) - img = np.random.random(size=[32, 784]).astype(np.float32) - label = np.ones(shape=[32, 1], dtype='int64') - return img, label - def _compare_reduce_and_allreduce(self, model, use_device, @@ -87,7 +88,7 @@ class TestMNIST(TestParallelExecutorBase): if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): return - img, label = self._init_data() + img, label = init_data() all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( model, @@ -116,7 +117,7 @@ class TestMNIST(TestParallelExecutorBase): if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): return - img, label = self._init_data() + img, label = init_data() self.check_network_convergence( simple_fc_net, @@ -144,7 +145,7 @@ class TestMNIST(TestParallelExecutorBase): if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda(): return - img, label = self._init_data() + img, label = init_data() single_first_loss, single_last_loss = self.check_network_convergence( method=simple_fc_net, @@ -175,7 +176,7 @@ class TestMNIST(TestParallelExecutorBase): return if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): return - img, label = self._init_data() + img, label = init_data() self.check_network_convergence( fc_with_batchnorm, @@ -199,6 +200,84 @@ class TestMNIST(TestParallelExecutorBase): 1e-5, 1e-2) +class TestMNISTNoReduce(unittest.TestCase): + def run_program(self, device_type): + if device_type == DeviceType.CUDA: + if not paddle.is_compiled_with_cuda(): + return + places = paddle.static.cuda_places() + else: + self.assertEqual(device_type, DeviceType.CPU) + places = paddle.static.cpu_places(4) + + paddle.seed(10) + with paddle.fluid.unique_name.guard(): + main = paddle.static.Program() + startup = paddle.static.Program() + with paddle.static.program_guard(main, startup): + loss = simple_fc_net(use_feed=True) + optimizer = paddle.optimizer.SGD(learning_rate=0.0) + optimizer.minimize(loss) + + grads = [p.name + '@GRAD' for p in main.all_parameters()] + no_reduce = paddle.static.BuildStrategy.ReduceStrategy._NoReduce + + build_strategy = paddle.static.BuildStrategy() + build_strategy.reduce_strategy = no_reduce + main_multi_place = paddle.static.CompiledProgram( + main).with_data_parallel( + loss_name=loss.name, + build_strategy=build_strategy, + places=places) + + build_strategy = paddle.static.BuildStrategy() + build_strategy.reduce_strategy = no_reduce + main_single_place = paddle.static.CompiledProgram(main.clone( + )).with_data_parallel( + loss_name=loss.name, + build_strategy=build_strategy, + places=places[0]) + + image, label = init_data() + feed = {'image': image, 'label': label} + exe = paddle.static.Executor(places[0]) + scope = paddle.static.Scope() + with paddle.static.scope_guard(scope): + exe.run(startup) + grads_multi_place = exe.run(main_multi_place, + feed=feed, + fetch_list=[grads]) + + feeds = self.split_feed(feed, len(places)) + grads_single_place = [list() for _ in range(len(grads))] + for f in feeds: + gs = exe.run(main_single_place, feed=f, fetch_list=[grads]) + for i, g in enumerate(gs): + grads_single_place[i].append(g) + + for i in range(len(grads)): + grads_single_place[i] = np.concatenate( + grads_single_place[i], axis=0) / len(places) + + self.assertEqual(len(grads_multi_place), len(grads_single_place)) + for g1, g2 in zip(grads_multi_place, grads_single_place): + self.assertTrue( + np.allclose(g1, g2), 'g1 = {}\ng2 = {}\n'.format(g1, g2)) + + def split_feed(self, feed, n): + image = feed['image'] + label = feed['label'] + self.assertEqual(image.shape[0] % n, 0) + self.assertEqual(label.shape[0] % n, 0) + images = np.split(image, n) + labels = np.split(label, n) + return [{'image': images[i], 'label': labels[i]} for i in range(n)] + + def test_main(self): + self.run_program(DeviceType.CUDA) + self.run_program(DeviceType.CPU) + + if __name__ == '__main__': paddle.enable_static() unittest.main()