未验证 提交 01950ceb 编写于 作者: L lilong12 提交者: GitHub

fix the bug in pipeline data parallelism (#29731)

* update, test=develop
上级 0f4b2186
...@@ -41,35 +41,36 @@ class PipelineHelper(object): ...@@ -41,35 +41,36 @@ class PipelineHelper(object):
inner_parallelism=None): inner_parallelism=None):
self.startup_program = startup_program self.startup_program = startup_program
endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[self.role_maker._worker_index()]
node_num = _get_node_num(endpoints)
assert len(endpoints) % node_num == 0
nranks = self.role_maker._worker_num() nranks = self.role_maker._worker_num()
rank = self.role_maker._worker_index() rank = self.role_maker._worker_index()
endpoints = self.role_maker._get_trainer_endpoints()
# Create ring 0 for all gpus in a pipeline current_endpoint = endpoints[rank]
pipeline_endpoints = [] node_num = _get_node_num(endpoints)
pipeline_rank = rank % inner_parallelism assert nranks % node_num == 0
pipeline_id = rank // inner_parallelism
for idx, ep in enumerate(endpoints): # Create ring 0 for all gpus in the same pipeline
if idx // inner_parallelism == pipeline_id: if inner_parallelism > 1:
pipeline_endpoints.append(ep) pipeline_rank = rank % inner_parallelism
self._init_communicator(self.startup_program, current_endpoint, pipeline_id = rank // inner_parallelism
pipeline_endpoints, pipeline_rank, 0, start_index = pipeline_id * inner_parallelism
self.wait_port) pipeline_endpoints = endpoints[start_index:start_index +
inner_parallelism]
self._init_communicator(self.startup_program, current_endpoint,
pipeline_endpoints, pipeline_rank, 0,
self.wait_port)
pipeline_num = len(endpoints) // inner_parallelism pipeline_num = len(endpoints) // inner_parallelism
if pipeline_num == 1: return if pipeline_num == 1: return
# Create rings for gpus with the same gpu id # Create rings for gpus with the same pipeline id for data parallel
eps = [] eps = []
local_rank = self.role_maker._worker_index() % inner_parallelism pipeline_rank = rank % inner_parallelism
ring_id = local_rank + 1 ring_id = pipeline_rank + 1
for i in range(pipeline_num): for i in range(pipeline_num):
eps.append(endpoints[i * inner_parallelism + local_rank]) eps.append(endpoints[i * inner_parallelism + pipeline_rank])
temp_rank = self.role_maker._worker_index() // inner_parallelism # rank in a ring of gpus with the same pipeline id for data parallel
dp_rank = rank // inner_parallelism
self._init_communicator(self.startup_program, current_endpoint, eps, self._init_communicator(self.startup_program, current_endpoint, eps,
temp_rank, ring_id, self.wait_port) dp_rank, ring_id, self.wait_port)
self._broadcast_params(ring_id) self._broadcast_params(ring_id)
def _init_communicator(self, program, current_endpoint, endpoints, rank, def _init_communicator(self, program, current_endpoint, endpoints, rank,
...@@ -108,8 +109,10 @@ class PipelineHelper(object): ...@@ -108,8 +109,10 @@ class PipelineHelper(object):
def _broadcast_params(self, ring_id): def _broadcast_params(self, ring_id):
block = self.startup_program.global_block() block = self.startup_program.global_block()
for param in block.iter_parameters(): for var_name in block.vars:
if param.is_distributed: if "nccl_id" in var_name: continue
param = block.var(var_name)
if not param.persistable:
continue continue
block.append_op( block.append_op(
...@@ -136,7 +139,7 @@ class PipelineOptimizer(MetaOptimizerBase): ...@@ -136,7 +139,7 @@ class PipelineOptimizer(MetaOptimizerBase):
self.inner_opt = optimizer self.inner_opt = optimizer
# we do not allow meta optimizer to be inner optimizer currently # we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = [] self.meta_optimizers_white_list = []
self.meta_optimizers_black_list = [] self.meta_optimizers_black_list = ["GraphExecutionOptimizer", ]
def _set_basic_info(self, loss, role_maker, user_defined_optimizer, def _set_basic_info(self, loss, role_maker, user_defined_optimizer,
user_defined_strategy): user_defined_strategy):
...@@ -161,14 +164,6 @@ class PipelineOptimizer(MetaOptimizerBase): ...@@ -161,14 +164,6 @@ class PipelineOptimizer(MetaOptimizerBase):
dist_strategy.pipeline = True dist_strategy.pipeline = True
dist_strategy.pipeline_configs = {"micro_batch": 1, } dist_strategy.pipeline_configs = {"micro_batch": 1, }
def _get_local_rank(self, current_endpoint, endpoints):
cur_node_endpoints = []
cur_ip = current_endpoint.split(':')[0].strip()
for ep in endpoints:
if cur_ip == ep.split(':')[0].strip():
cur_node_endpoints.append(ep)
return cur_node_endpoints.index(current_endpoint)
def minimize_impl(self, def minimize_impl(self,
loss, loss,
startup_program=None, startup_program=None,
...@@ -176,56 +171,51 @@ class PipelineOptimizer(MetaOptimizerBase): ...@@ -176,56 +171,51 @@ class PipelineOptimizer(MetaOptimizerBase):
no_grad_set=None): no_grad_set=None):
endpoints = self.role_maker._get_trainer_endpoints() endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[self.role_maker._worker_index()] current_endpoint = endpoints[self.role_maker._worker_index()]
self.local_rank = self._get_local_rank(current_endpoint, endpoints)
self.wrapped_opt = PO(self.inner_opt, self.wrapped_opt = PO(self.inner_opt,
num_microbatches=self.num_microbatches, num_microbatches=self.num_microbatches)
start_cpu_core_id=self.local_rank)
node_num = _get_node_num(endpoints) node_num = _get_node_num(endpoints)
gpus_per_node = len(endpoints) // node_num gpus_per_node = len(endpoints) // node_num
self.startup_program = startup_program self.startup_program = startup_program
self.local_rank = self._get_local_rank(current_endpoint, endpoints)
if startup_program is None: if startup_program is None:
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
loss.block.program._pipeline_opt = dict() self.rank = self.role_maker._worker_index()
loss.block.program._pipeline_opt['local_rank'] = self.local_rank self.nranks = self.role_maker._worker_num()
optimize_ops, params_grads, prog_list = \ assert self.nranks % node_num == 0
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
loss.block.program._pipeline_opt = dict()
loss.block.program._pipeline_opt['local_rank'] = self.rank
optimize_ops, params_grads, prog_list = self.wrapped_opt.minimize(
loss, startup_program, parameter_list, no_grad_set)
assert prog_list assert prog_list
self.main_program_list = prog_list self.main_program_list = prog_list
self.main_program = loss.block.program self.main_program = loss.block.program
self.inner_parallelism = loss.block.program._pipeline_opt[ self.inner_parallelism = loss.block.program._pipeline_opt[
'inner_parallelism'] 'inner_parallelism']
nranks = len(endpoints) assert self.nranks % self.inner_parallelism == 0
self.nranks = nranks
self.nrings = len(self.main_program_list)
self.rank = self.role_maker._worker_index()
self.endpoints = endpoints
self.current_endpoint = current_endpoint
pipeline_helper = PipelineHelper(self.role_maker) pipeline_helper = PipelineHelper(self.role_maker)
pipeline_helper.update_startup_program( pipeline_helper.update_startup_program(
self.startup_program._pipeline_opt["startup_program"], self.startup_program._pipeline_opt["startup_program"],
self.inner_parallelism) self.inner_parallelism)
self._transpile_main_program(loss, node_num, gpus_per_node) pipeline_num = self.nranks // self.inner_parallelism
self._transpile_main_program(loss, pipeline_num, self.inner_parallelism)
return optimize_ops, params_grads return optimize_ops, params_grads
def _transpile_main_program(self, loss, node_num, gpus_per_node): def _transpile_main_program(self, loss, pipeline_num, inner_parallelism):
self._insert_loss_grad_ops(loss, gpus_per_node, node_num) if pipeline_num <= 1: return
for ring_id in range(1, gpus_per_node + 1): self._insert_loss_grad_ops(loss, pipeline_num)
for ring_id in range(1, inner_parallelism + 1):
self._insert_allreduce_ops(ring_id) self._insert_allreduce_ops(ring_id)
def _insert_loss_grad_ops(self, loss, gpus_per_node, node_num): def _insert_loss_grad_ops(self, loss, pipeline_num):
""" """
In order to keep the learning rate consistent in different numbers of In order to keep the learning rate consistent in different numbers of
training workers, we scale the loss grad by the number of workers training workers, we scale the loss grad by the number of workers
""" """
block = self.main_program_list[gpus_per_node - 1][ block = self.main_program_list[-1]['program'].global_block()
'program'].global_block()
for idx, op in reversed(list(enumerate(block.ops))): for idx, op in reversed(list(enumerate(block.ops))):
if is_loss_grad_op(op): if is_loss_grad_op(op):
loss_grad_var = block.vars[op.output_arg_names[0]] loss_grad_var = block.vars[op.output_arg_names[0]]
...@@ -235,7 +225,7 @@ class PipelineOptimizer(MetaOptimizerBase): ...@@ -235,7 +225,7 @@ class PipelineOptimizer(MetaOptimizerBase):
inputs={'X': loss_grad_var}, inputs={'X': loss_grad_var},
outputs={'Out': loss_grad_var}, outputs={'Out': loss_grad_var},
attrs={ attrs={
'scale': 1.0 / node_num, 'scale': 1.0 / pipeline_num,
OP_ROLE_KEY: OpRole.Backward OP_ROLE_KEY: OpRole.Backward
}) })
...@@ -269,7 +259,7 @@ class PipelineOptimizer(MetaOptimizerBase): ...@@ -269,7 +259,7 @@ class PipelineOptimizer(MetaOptimizerBase):
block._insert_op( block._insert_op(
offset, offset,
type='c_sync_calc_stream', type='c_allreduce_sum',
inputs={'X': grad}, inputs={'X': grad},
outputs={'Out': grad}, outputs={'Out': grad},
attrs={ attrs={
...@@ -283,7 +273,7 @@ class PipelineOptimizer(MetaOptimizerBase): ...@@ -283,7 +273,7 @@ class PipelineOptimizer(MetaOptimizerBase):
for idx, op in enumerate(block.ops): for idx, op in enumerate(block.ops):
if is_optimizer_op(op): if is_optimizer_op(op):
block._insert_op( block._insert_op(
idx + ring_id, idx,
type='c_sync_comm_stream', type='c_sync_comm_stream',
inputs={'X': grad}, inputs={'X': grad},
outputs={'Out': grad}, outputs={'Out': grad},
......
...@@ -16,6 +16,7 @@ from __future__ import print_function ...@@ -16,6 +16,7 @@ from __future__ import print_function
import numpy as np import numpy as np
import six import six
import os
import logging import logging
from collections import defaultdict from collections import defaultdict
...@@ -39,6 +40,7 @@ from .dygraph.learning_rate_scheduler import LearningRateDecay, _LearningRateEpo ...@@ -39,6 +40,7 @@ from .dygraph.learning_rate_scheduler import LearningRateDecay, _LearningRateEpo
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid.layers import tensor from paddle.fluid.layers import tensor
from functools import reduce from functools import reduce
from functools import cmp_to_key
from .wrapped_decorator import signature_safe_contextmanager from .wrapped_decorator import signature_safe_contextmanager
from .. import compat as cpt from .. import compat as cpt
...@@ -3773,7 +3775,7 @@ class PipelineOptimizer(object): ...@@ -3773,7 +3775,7 @@ class PipelineOptimizer(object):
self._op_device_key = op_maker.kOpDeviceAttrName() self._op_device_key = op_maker.kOpDeviceAttrName()
self._param_device_map = None self._param_device_map = None
def _create_vars(self, block, main_program): def _create_vars(self, block, ori_block):
# Create vars for block, copied from main_program's global block # Create vars for block, copied from main_program's global block
used_var_set = set() used_var_set = set()
for op_idx in range(block.desc.op_size()): for op_idx in range(block.desc.op_size()):
...@@ -3785,7 +3787,8 @@ class PipelineOptimizer(object): ...@@ -3785,7 +3787,8 @@ class PipelineOptimizer(object):
if var in used_var_set or "_blocking_queue" in var: if var in used_var_set or "_blocking_queue" in var:
continue continue
used_var_set.add(var) used_var_set.add(var)
source_var = main_program.block(0).var(str(var)) if block._find_var_recursive(str(var)): continue
source_var = ori_block._var_recursive(str(var))
if source_var.type == core.VarDesc.VarType.READER: if source_var.type == core.VarDesc.VarType.READER:
block.create_var( block.create_var(
name=var, name=var,
...@@ -3840,45 +3843,65 @@ class PipelineOptimizer(object): ...@@ -3840,45 +3843,65 @@ class PipelineOptimizer(object):
op_desc = op.desc op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op() ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "") # ap_op._set_attr(self._op_device_key, "")
elif op.type == "create_py_reader" or op.type == "read": elif op.type == "create_py_reader" or op.type == "read" or op.type == "create_double_buffer_reader":
# Copy read related ops to all section to make them exit after each epoch. # Copy read related ops to all section to make them exit after each epoch.
for device in device_program_map.keys(): for device in device_program_map.keys():
program = device_program_map[device] program = device_program_map[device]
op_desc = op.desc op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op() ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
else: else:
program = device_program_map[device] program = device_program_map[device]
op_desc = op.desc op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op() ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
for key in sorted(device_program_map.keys()): for key in devices:
program = device_program_map[key] program = device_program_map[key]
program['program']._sync_with_cpp() program['program']._sync_with_cpp()
programs.append(program) programs.append(program)
return programs return programs
def _get_op_device_for_startup_program(self, var_name):
"""
For adam optimizer, it will add accumulators and initialize them
with fill_constant, and force the op device to cpu. Hence, we should
get the real op_device attribute of the fill_constant as the device
where the corresponding parameters on.
"""
assert "beta1_pow_acc" in var_name or "beta2_pow_acc" in var_name
param_name = var_name[0:var_name.index('_beta')]
device = self._param_device_map[param_name]
return device
def _split_startup_program(self, startup_program, local_rank): def _split_startup_program(self, startup_program, local_rank):
block = startup_program.block(0) block = startup_program.block(0)
new_startup_program = Program() new_startup_program = Program()
for op in block.ops: for op in block.ops:
device = op.attr(self._op_device_key) device = op.attr(self._op_device_key)
if device == "cpu":
assert op.type == "fill_constant", (
"For ops in startup "
"program that with the op_device attribute of cpu, "
"they must be fill_constant.")
output_var = op.output_arg_names[0]
device = self._get_op_device_for_startup_program(output_var)
if device: if device:
device_index = int(device.split(":")[1]) device_index = int(device.split(':')[1])
else: else:
device_index = None # LR related ops
if device_index is not None and device_index != local_rank: continue device = None
if device and device_index != local_rank: continue
op_desc = op.desc op_desc = op.desc
ap_op = new_startup_program.block(0).desc.append_op() ap_op = new_startup_program.block(0).desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "") ap_op._set_attr(self._op_device_key, "")
new_startup_program._sync_with_cpp() new_startup_program._sync_with_cpp()
self._create_vars(new_startup_program.block(0), startup_program) self._create_vars(
new_startup_program.block(0), startup_program.global_block())
return new_startup_program return new_startup_program
def _find_post_op(self, ops, cur_op, var_name): def _find_post_op(self, ops, cur_op, var_name):
...@@ -4093,6 +4116,8 @@ class PipelineOptimizer(object): ...@@ -4093,6 +4116,8 @@ class PipelineOptimizer(object):
first_device = op.attr(self._op_device_key) first_device = op.attr(self._op_device_key)
break break
assert first_device assert first_device
first_device_type = first_device.split(":")[0]
assert first_device_type == "gpu"
# set op_device attr for lr-related ops # set op_device attr for lr-related ops
lrsched_role = int(self._op_role.LRSched) lrsched_role = int(self._op_role.LRSched)
...@@ -4136,10 +4161,11 @@ class PipelineOptimizer(object): ...@@ -4136,10 +4161,11 @@ class PipelineOptimizer(object):
dev_spec = op.attr(self._op_device_key) dev_spec = op.attr(self._op_device_key)
assert dev_spec, ("op_device attribute for op " assert dev_spec, ("op_device attribute for op "
"{} has not been set.".format(op.type)) "{} has not been set.".format(op.type))
dev_type = dev_spec.split(':')[0]
assert dev_type == "gpu", ("Now only gpu devices are supported "
"for pipeline parallelism.")
if not dev_spec in device_specs: if not dev_spec in device_specs:
device_specs.append(dev_spec) device_specs.append(dev_spec)
sorted_device_specs = sorted(device_specs)
assert sorted_device_specs == device_specs
return device_specs return device_specs
def _insert_sendrecv_ops_for_boundaries(self, block): def _insert_sendrecv_ops_for_boundaries(self, block):
...@@ -4216,6 +4242,7 @@ class PipelineOptimizer(object): ...@@ -4216,6 +4242,7 @@ class PipelineOptimizer(object):
device = self._param_device_map[param_name] device = self._param_device_map[param_name]
if device != dev_spec: continue if device != dev_spec: continue
grad_name = self._append_grad_suffix(param_name) grad_name = self._append_grad_suffix(param_name)
if not main_block.has_var(grad_name): continue
grad_var = main_block.vars[grad_name] grad_var = main_block.vars[grad_name]
main_block._insert_op( main_block._insert_op(
index=0, index=0,
...@@ -4297,6 +4324,7 @@ class PipelineOptimizer(object): ...@@ -4297,6 +4324,7 @@ class PipelineOptimizer(object):
ap_op = new_sub_block.desc.append_op() ap_op = new_sub_block.desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
new_sub_block._sync_with_cpp() new_sub_block._sync_with_cpp()
self._create_vars(new_sub_block, origin_sub_block)
op._set_attr('sub_block:', new_sub_block) op._set_attr('sub_block:', new_sub_block)
def _get_device_info(self, block): def _get_device_info(self, block):
...@@ -4318,6 +4346,7 @@ class PipelineOptimizer(object): ...@@ -4318,6 +4346,7 @@ class PipelineOptimizer(object):
prog = prog_info['program'] prog = prog_info['program']
block = prog.block(0) block = prog.block(0)
for var_name in block.vars: for var_name in block.vars:
if var_name == "double_buffer_0": continue
var = block.var(var_name) var = block.var(var_name)
if not var.persistable: continue if not var.persistable: continue
if not var_name in var_info: if not var_name in var_info:
...@@ -4413,30 +4442,33 @@ class PipelineOptimizer(object): ...@@ -4413,30 +4442,33 @@ class PipelineOptimizer(object):
self._add_default_opdevice_attr(main_block) self._add_default_opdevice_attr(main_block)
device_specs = self._check_validation(main_block) device_specs = self._check_validation(main_block)
assert len(device_specs) > 1
def device_cmp(device1, device2):
dev1_id = int(device1.split(':')[1])
dev2_id = int(device2.split(':')[1])
if dev1_id < dev2_id:
return -1
elif dev1_id > dev2_id:
return 1
else:
return 0
sorted_device_spec = sorted(device_specs, key=cmp_to_key(device_cmp))
assert sorted_device_spec == device_specs, (
"With pipeline "
"parallelism, you must use gpu devices one after another "
"in the order of their ids.")
# Step3: add send and recv ops between section boundaries # Step3: add send and recv ops between section boundaries
self._insert_sendrecv_ops_for_boundaries(main_block) self._insert_sendrecv_ops_for_boundaries(main_block)
place_list = []
place_id_list = []
for dev_spec in device_specs:
if dev_spec == "cpu":
place_list.append(core.CPUPlace())
place_id_list.append(-1)
elif "gpu" in dev_spec and ":" in dev_spec:
dev_index = dev_spec.split(":")[1]
place_list.append(core.CUDAPlace(int(dev_index)))
place_id_list.append(int(dev_index))
else:
raise ValueError("Unknown device type: %s", dev_spec)
# Step4: split program into sections and add pairs of # Step4: split program into sections and add pairs of
# send and recv ops for data var. # send and recv ops for data var.
main_program = main_block.program main_program = main_block.program
program_list = self._split_program(main_program, device_specs) program_list = self._split_program(main_program, device_specs)
for p in program_list: for p in program_list:
self._create_vars(p["program"].block(0), main_program) self._create_vars(p["program"].block(0),
main_program.global_block())
self._insert_sendrecv_for_data_var(main_block, program_list, self._insert_sendrecv_for_data_var(main_block, program_list,
startup_program, device_specs) startup_program, device_specs)
...@@ -4452,7 +4484,13 @@ class PipelineOptimizer(object): ...@@ -4452,7 +4484,13 @@ class PipelineOptimizer(object):
isinstance(main_program._pipeline_opt, dict) and isinstance(main_program._pipeline_opt, dict) and
'local_rank' in main_program._pipeline_opt), \ 'local_rank' in main_program._pipeline_opt), \
"You must use pipeline with fleet" "You must use pipeline with fleet"
local_rank = main_program._pipeline_opt['local_rank'] local_rank = main_program._pipeline_opt['local_rank'] % len(
device_specs)
place_list = []
for dev_spec in device_specs:
dev_index = dev_spec.split(":")[1]
place_list.append(core.CUDAPlace(local_rank))
# Step7: Split startup program # Step7: Split startup program
new_startup_program = self._split_startup_program(startup_program, new_startup_program = self._split_startup_program(startup_program,
...@@ -4466,21 +4504,18 @@ class PipelineOptimizer(object): ...@@ -4466,21 +4504,18 @@ class PipelineOptimizer(object):
self._accumulate_gradients(program_list[local_rank]['program'] self._accumulate_gradients(program_list[local_rank]['program']
.global_block()) .global_block())
with open("startup_prog_%d" % local_rank, 'w') as f:
f.writelines(str(new_startup_program))
with open("main_prog_%d" % local_rank, 'w') as f:
f.writelines(str(program_list[local_rank]['program']))
startup_program._pipeline_opt = { startup_program._pipeline_opt = {
"startup_program": new_startup_program, "startup_program": new_startup_program,
} }
place_id = int(os.getenv("FLAGS_selected_gpus", "0"))
main_program._pipeline_opt = { main_program._pipeline_opt = {
"trainer": "PipelineTrainer", "trainer": "PipelineTrainer",
"device_worker": "Section", "device_worker": "Section",
"inner_parallelism": len(device_specs), "inner_parallelism": len(device_specs),
"section_program": program_list[local_rank], "section_program": program_list[local_rank],
"place": place_list[local_rank], "place": place_list[local_rank],
"place_id": place_id_list[local_rank], "place_id": place_id,
"sync_steps": -1, "sync_steps": -1,
"num_microbatches": self._num_microbatches, "num_microbatches": self._num_microbatches,
"start_cpu_core_id": self._start_cpu_core_id, "start_cpu_core_id": self._start_cpu_core_id,
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import time
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
from paddle.fluid import core
import unittest
from multiprocessing import Process
import os
import signal
from functools import reduce
from test_dist_base import TestDistRunnerBase, runtime_main
import paddle.distributed.fleet as fleet
paddle.enable_static()
DTYPE = "float32"
paddle.dataset.mnist.fetch()
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
def cnn_model(data):
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=data,
filter_size=5,
num_filters=20,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
SIZE = 10
input_shape = conv_pool_2.shape
param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE]
scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5
predict = fluid.layers.fc(
input=conv_pool_2,
size=SIZE,
act="softmax",
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01)))
return predict
class TestDistMnist2x2(TestDistRunnerBase):
def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None):
# Input data
device_id = 0
if dist_strategy:
fleet.init(is_collective=True)
with fluid.device_guard("gpu:0"):
images = fluid.layers.data(
name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if dist_strategy:
data_loader = fluid.io.DataLoader.from_generator(
feed_list=[images, label],
capacity=64,
use_double_buffer=False,
iterable=False)
# Train program
predict = cnn_model(images)
with fluid.device_guard("gpu:0"):
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
with fluid.device_guard("gpu:0"):
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
inference_program = fluid.default_main_program().clone()
base_lr = self.lr
passes = [30, 60, 80, 90]
steps_per_pass = 10
bd = [steps_per_pass * p for p in passes]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
opt = fluid.optimizer.Momentum(learning_rate=lr_val, momentum=0.9)
# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
if dist_strategy:
strategy = fleet.DistributedStrategy()
strategy.pipeline = True
dist_opt = fleet.distributed_optimizer(
optimizer=opt, strategy=strategy)
dist_opt.minimize(avg_cost)
else:
opt.minimize(avg_cost)
if dist_strategy:
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader
else:
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict
if __name__ == "__main__":
runtime_main(TestDistMnist2x2)
...@@ -50,7 +50,7 @@ class TestFleetMetaOptimizer(unittest.TestCase): ...@@ -50,7 +50,7 @@ class TestFleetMetaOptimizer(unittest.TestCase):
strategy.pipeline = True strategy.pipeline = True
strategy.pipeline_configs = {'micro_batch': 2} strategy.pipeline_configs = {'micro_batch': 2}
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = paddle.fluid.optimizer.Adam(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
......
...@@ -40,6 +40,14 @@ class TestPipeline(TestDistBase): ...@@ -40,6 +40,14 @@ class TestPipeline(TestDistBase):
check_error_log=True, check_error_log=True,
log_name=flag_name) log_name=flag_name)
def test_dist_train_one_device(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"pipeline_mnist_one_device.py",
check_error_log=True,
log_name=flag_name)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册