未验证 提交 e50d883e 编写于 作者: S sneaxiy 提交者: GitHub

Add NoReduce mode for ParallelExecutor (#38969)

* add no reduce mode for pe

* add NoReduce ut
上级 6eeb16b8
...@@ -239,6 +239,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -239,6 +239,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
multi_devices_pass = multi_devices_pass =
AppendPass("reduce_mode_multi_devices_pass").get(); AppendPass("reduce_mode_multi_devices_pass").get();
break; break;
case BuildStrategy::ReduceStrategy::kNoReduce:
multi_devices_pass = AppendPass("no_reduce_multi_devices_pass").get();
break;
default: default:
PADDLE_THROW( PADDLE_THROW(
platform::errors::Unimplemented("Unknown reduce strategy.")); platform::errors::Unimplemented("Unknown reduce strategy."));
...@@ -475,6 +478,7 @@ USE_PASS(fuse_bn_act_pass); ...@@ -475,6 +478,7 @@ USE_PASS(fuse_bn_act_pass);
USE_PASS(fuse_bn_add_act_pass); USE_PASS(fuse_bn_add_act_pass);
USE_PASS(graph_viz_pass); USE_PASS(graph_viz_pass);
USE_PASS(multi_batch_merge_pass); USE_PASS(multi_batch_merge_pass);
USE_PASS(no_reduce_multi_devices_pass);
USE_PASS(reduce_mode_multi_devices_pass); USE_PASS(reduce_mode_multi_devices_pass);
USE_PASS(all_reduce_mode_multi_devices_pass); USE_PASS(all_reduce_mode_multi_devices_pass);
USE_PASS(dist_multi_devices_pass); USE_PASS(dist_multi_devices_pass);
......
...@@ -72,7 +72,7 @@ struct BuildStrategy { ...@@ -72,7 +72,7 @@ struct BuildStrategy {
// For CPU, if you want to fix the order of summing to make the result // For CPU, if you want to fix the order of summing to make the result
// of kAllReduce and kReduce no diff, you can add // of kAllReduce and kReduce no diff, you can add
// `FLAGS_cpu_deterministic=true` to env. // `FLAGS_cpu_deterministic=true` to env.
enum class ReduceStrategy { kAllReduce = 0, kReduce = 1 }; enum class ReduceStrategy { kAllReduce = 0, kReduce = 1, kNoReduce = 2 };
enum class GradientScaleStrategy { enum class GradientScaleStrategy {
kCoeffNumDevice = 0, kCoeffNumDevice = 0,
......
...@@ -117,6 +117,7 @@ message BuildStrategy { ...@@ -117,6 +117,7 @@ message BuildStrategy {
optional bool enable_addto = 12 [ default = false ]; optional bool enable_addto = 12 [ default = false ];
optional bool fix_op_run_order = 13 [ default = false ]; optional bool fix_op_run_order = 13 [ default = false ];
optional bool allow_cuda_graph_capture = 14 [ default = false ]; optional bool allow_cuda_graph_capture = 14 [ default = false ];
optional int32 reduce_strategy = 15 [ default = 0 ];
} }
message ExecutionStrategy { message ExecutionStrategy {
......
...@@ -1283,3 +1283,5 @@ REGISTER_MULTI_DEVICES_PASS(dist_multi_devices_pass, ...@@ -1283,3 +1283,5 @@ REGISTER_MULTI_DEVICES_PASS(dist_multi_devices_pass,
paddle::framework::ir::DistSSAGraphBuilder); paddle::framework::ir::DistSSAGraphBuilder);
REGISTER_MULTI_DEVICES_PASS(async_multi_devices_pass, REGISTER_MULTI_DEVICES_PASS(async_multi_devices_pass,
paddle::framework::ir::AsyncSSAGraphBuilder); paddle::framework::ir::AsyncSSAGraphBuilder);
REGISTER_MULTI_DEVICES_PASS(no_reduce_multi_devices_pass,
paddle::framework::ir::NoReduceSSAGraphBuilder);
...@@ -144,6 +144,15 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { ...@@ -144,6 +144,15 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
bool IsEncoded(const std::string &p_name) const; bool IsEncoded(const std::string &p_name) const;
}; };
class NoReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
protected:
void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const override {}
void InsertPostprocessOps(ir::Graph *result) const override {}
};
class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
protected: protected:
void InsertCollectiveOp(ir::Graph *result, ir::Node *node, void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
......
...@@ -2984,7 +2984,8 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -2984,7 +2984,8 @@ All parameter, weight, gradient are variables in Paddle.
py::enum_<BuildStrategy::ReduceStrategy>(build_strategy, "ReduceStrategy") py::enum_<BuildStrategy::ReduceStrategy>(build_strategy, "ReduceStrategy")
.value("Reduce", BuildStrategy::ReduceStrategy::kReduce) .value("Reduce", BuildStrategy::ReduceStrategy::kReduce)
.value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce); .value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce)
.value("_NoReduce", BuildStrategy::ReduceStrategy::kNoReduce);
py::enum_<BuildStrategy::GradientScaleStrategy>(build_strategy, py::enum_<BuildStrategy::GradientScaleStrategy>(build_strategy,
"GradientScaleStrategy") "GradientScaleStrategy")
.value("CoeffNumDevice", .value("CoeffNumDevice",
......
...@@ -102,6 +102,10 @@ class DistributedJobInfo(object): ...@@ -102,6 +102,10 @@ class DistributedJobInfo(object):
self.job_info.strategy = dist_strategy self.job_info.strategy = dist_strategy
ReduceStrategyFluid = paddle.fluid.BuildStrategy.ReduceStrategy
ReduceStrategyFleet = int
class DistributedStrategy(object): class DistributedStrategy(object):
__lock_attr = False __lock_attr = False
...@@ -239,8 +243,10 @@ class DistributedStrategy(object): ...@@ -239,8 +243,10 @@ class DistributedStrategy(object):
build_strategy = paddle.fluid.BuildStrategy() build_strategy = paddle.fluid.BuildStrategy()
fields = self.strategy.build_strategy.DESCRIPTOR.fields fields = self.strategy.build_strategy.DESCRIPTOR.fields
for f in fields: for f in fields:
setattr(build_strategy, f.name, value = getattr(self.strategy.build_strategy, f.name)
getattr(self.strategy.build_strategy, f.name)) if f.name == 'reduce_strategy':
value = ReduceStrategyFluid(value)
setattr(build_strategy, f.name, value)
return build_strategy return build_strategy
@build_strategy.setter @build_strategy.setter
...@@ -249,8 +255,10 @@ class DistributedStrategy(object): ...@@ -249,8 +255,10 @@ class DistributedStrategy(object):
fields = self.strategy.build_strategy.DESCRIPTOR.fields fields = self.strategy.build_strategy.DESCRIPTOR.fields
for f in fields: for f in fields:
if f.label == 1 or f.label == 2: # optional and required field if f.label == 1 or f.label == 2: # optional and required field
setattr(self.strategy.build_strategy, f.name, value = getattr(strategy, f.name)
getattr(strategy, f.name)) if f.name == 'reduce_strategy':
value = ReduceStrategyFleet(value)
setattr(self.strategy.build_strategy, f.name, value)
elif f.label == 3: # repeated field elif f.label == 3: # repeated field
getattr(self.strategy.build_strategy, getattr(self.strategy.build_strategy,
f.name).extend(getattr(strategy, f.name)) f.name).extend(getattr(strategy, f.name))
......
...@@ -85,6 +85,16 @@ def _has_optimizer_in_control_flow(program): ...@@ -85,6 +85,16 @@ def _has_optimizer_in_control_flow(program):
return False return False
def _should_broadcast_or_not_exists(program, var_name):
block = program.global_block()
var = block.vars.get(var_name, None)
if var is None:
return True
is_distributed = getattr(var, '_is_distributed', False) or getattr(
var, 'is_distributed', False)
return not is_distributed
class CompiledProgram(object): class CompiledProgram(object):
""" """
:api_attr: Static Graph :api_attr: Static Graph
...@@ -398,7 +408,10 @@ class CompiledProgram(object): ...@@ -398,7 +408,10 @@ class CompiledProgram(object):
for node in self._graph.nodes(): for node in self._graph.nodes():
if node.is_var() and node.var() is not None and node.var().persistable() and \ if node.is_var() and node.var() is not None and node.var().persistable() and \
node.var().type() != core.VarDesc.VarType.RAW: node.var().type() != core.VarDesc.VarType.RAW:
self._persistable_vars.append(cpt.to_text(node.name())) name = cpt.to_text(node.name())
if self._program is not None and _should_broadcast_or_not_exists(
self._program, name):
self._persistable_vars.append(cpt.to_text(node.name()))
places = list(map(_place_obj, places)) places = list(map(_place_obj, places))
......
...@@ -65,17 +65,18 @@ def fc_with_batchnorm(use_feed): ...@@ -65,17 +65,18 @@ def fc_with_batchnorm(use_feed):
return loss return loss
def init_data():
np.random.seed(5)
img = np.random.random(size=[32, 784]).astype(np.float32)
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
class TestMNIST(TestParallelExecutorBase): class TestMNIST(TestParallelExecutorBase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
os.environ['CPU_NUM'] = str(4) os.environ['CPU_NUM'] = str(4)
def _init_data(self):
np.random.seed(5)
img = np.random.random(size=[32, 784]).astype(np.float32)
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
def _compare_reduce_and_allreduce(self, def _compare_reduce_and_allreduce(self,
model, model,
use_device, use_device,
...@@ -87,7 +88,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -87,7 +88,7 @@ class TestMNIST(TestParallelExecutorBase):
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return return
img, label = self._init_data() img, label = init_data()
all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence(
model, model,
...@@ -116,7 +117,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -116,7 +117,7 @@ class TestMNIST(TestParallelExecutorBase):
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return return
img, label = self._init_data() img, label = init_data()
self.check_network_convergence( self.check_network_convergence(
simple_fc_net, simple_fc_net,
...@@ -144,7 +145,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -144,7 +145,7 @@ class TestMNIST(TestParallelExecutorBase):
if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda(): if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda():
return return
img, label = self._init_data() img, label = init_data()
single_first_loss, single_last_loss = self.check_network_convergence( single_first_loss, single_last_loss = self.check_network_convergence(
method=simple_fc_net, method=simple_fc_net,
...@@ -175,7 +176,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -175,7 +176,7 @@ class TestMNIST(TestParallelExecutorBase):
return return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return return
img, label = self._init_data() img, label = init_data()
self.check_network_convergence( self.check_network_convergence(
fc_with_batchnorm, fc_with_batchnorm,
...@@ -199,6 +200,84 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -199,6 +200,84 @@ class TestMNIST(TestParallelExecutorBase):
1e-5, 1e-2) 1e-5, 1e-2)
class TestMNISTNoReduce(unittest.TestCase):
def run_program(self, device_type):
if device_type == DeviceType.CUDA:
if not paddle.is_compiled_with_cuda():
return
places = paddle.static.cuda_places()
else:
self.assertEqual(device_type, DeviceType.CPU)
places = paddle.static.cpu_places(4)
paddle.seed(10)
with paddle.fluid.unique_name.guard():
main = paddle.static.Program()
startup = paddle.static.Program()
with paddle.static.program_guard(main, startup):
loss = simple_fc_net(use_feed=True)
optimizer = paddle.optimizer.SGD(learning_rate=0.0)
optimizer.minimize(loss)
grads = [p.name + '@GRAD' for p in main.all_parameters()]
no_reduce = paddle.static.BuildStrategy.ReduceStrategy._NoReduce
build_strategy = paddle.static.BuildStrategy()
build_strategy.reduce_strategy = no_reduce
main_multi_place = paddle.static.CompiledProgram(
main).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
places=places)
build_strategy = paddle.static.BuildStrategy()
build_strategy.reduce_strategy = no_reduce
main_single_place = paddle.static.CompiledProgram(main.clone(
)).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
places=places[0])
image, label = init_data()
feed = {'image': image, 'label': label}
exe = paddle.static.Executor(places[0])
scope = paddle.static.Scope()
with paddle.static.scope_guard(scope):
exe.run(startup)
grads_multi_place = exe.run(main_multi_place,
feed=feed,
fetch_list=[grads])
feeds = self.split_feed(feed, len(places))
grads_single_place = [list() for _ in range(len(grads))]
for f in feeds:
gs = exe.run(main_single_place, feed=f, fetch_list=[grads])
for i, g in enumerate(gs):
grads_single_place[i].append(g)
for i in range(len(grads)):
grads_single_place[i] = np.concatenate(
grads_single_place[i], axis=0) / len(places)
self.assertEqual(len(grads_multi_place), len(grads_single_place))
for g1, g2 in zip(grads_multi_place, grads_single_place):
self.assertTrue(
np.allclose(g1, g2), 'g1 = {}\ng2 = {}\n'.format(g1, g2))
def split_feed(self, feed, n):
image = feed['image']
label = feed['label']
self.assertEqual(image.shape[0] % n, 0)
self.assertEqual(label.shape[0] % n, 0)
images = np.split(image, n)
labels = np.split(label, n)
return [{'image': images[i], 'label': labels[i]} for i in range(n)]
def test_main(self):
self.run_program(DeviceType.CUDA)
self.run_program(DeviceType.CPU)
if __name__ == '__main__': if __name__ == '__main__':
paddle.enable_static() paddle.enable_static()
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册