未验证 提交 34aebbce 编写于 作者: Y Yuang Liu 提交者: GitHub

add precision unitest for executor all reduce (#33339)

上级 82630f38
......@@ -713,6 +713,7 @@ if (WITH_DISTRIBUTE)
set_tests_properties(test_dist_fleet_ctr2 PROPERTIES TIMEOUT 200)
set_tests_properties(test_dist_fleet_sparse_embedding_ctr PROPERTIES TIMEOUT 200)
set_tests_properties(test_dist_fleet_infer PROPERTIES TIMEOUT 200)
set_tests_properties(test_dist_fleet_raw_program_optimizer PROPERTIES TIMEOUT 120)
endif()
if (WITH_DISTRIBUTE AND NOT APPLE)
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_dist_base import TestDistRunnerBase, runtime_main
import unittest
import paddle
import os
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import numpy as np
from functools import reduce
import paddle.fluid as fluid
paddle.enable_static()
DTYPE = "float32"
paddle.dataset.mnist.fetch()
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
def cnn_model(data):
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=data,
filter_size=5,
num_filters=20,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
SIZE = 10
input_shape = conv_pool_2.shape
param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE]
scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5
predict = fluid.layers.fc(
input=conv_pool_2,
size=SIZE,
act="softmax",
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01)))
return predict
class TestFleetMetaOptimizerPrecision(TestDistRunnerBase):
def get_model(self, batch_size=2, single_device=False):
# Input data
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# Train program
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
test_program = fluid.default_main_program().clone(for_test=True)
# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
optimizer = paddle.fluid.optimizer.Adam(0.01)
if single_device:
optimizer.minimize(avg_cost)
else:
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.without_graph_optimization = True
optimizer = fleet.distributed_optimizer(
optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
return test_program, avg_cost, train_reader, test_reader, batch_acc, predict
if __name__ == "__main__":
runtime_main(TestFleetMetaOptimizerPrecision)
......@@ -186,6 +186,76 @@ class TestDistRunnerBase(object):
fleet.save_inference_model(exe, infer_save_dir_fleet,
feeded_var_names, [avg_cost])
def run_use_fleet_api_20_trainer(self, args):
"""
1. remove codes for DistributedStrategy and leave the DistributedStrategy part to get_model()
2. to run with fleet 2.0 api, set flags _use_fleet_api and _use_fleet_api_20 to True
3. for now, not support test for model save
"""
assert args.update_method == "nccl2" or "bkcl"
self.lr = args.lr
print_to_err("use_fleet 2.0", "fleet.node_num:")
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
self.get_model(batch_size=args.batch_size)
if fluid.core.is_compiled_with_cuda():
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(device_id)
elif fluid.core.is_compiled_with_xpu():
device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
place = fluid.XPUPlace(device_id)
else:
raise ValueError(
"fleet dygraph api must in paddlepaddle-xpu or paddlepaddle-gpu."
)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
eprint(type(self).__name__, "run worker startup program done.")
feed_var_list = [
var
for var in fluid.default_main_program().global_block().vars.values()
if var.is_data
]
eprint("feed_var_list:", feed_var_list)
if feed_var_list[0].name == 'label':
feed_var_list = feed_var_list[::-1]
feeder = fluid.DataFeeder(feed_var_list, place)
reader_generator = train_reader()
def get_data():
origin_batch = next(reader_generator)
if args.update_method != "local" and args.use_reader_alloc:
new_batch = []
for offset, item in enumerate(origin_batch):
if offset % 2 == args.trainer_id:
new_batch.append(item)
return new_batch
else:
return origin_batch
print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = []
for i in six.moves.xrange(RUN_STEP):
loss, = exe.run(fluid.default_main_program(),
fetch_list=[avg_cost.name],
feed=feeder.feed(get_data()))
out_losses.append(loss[0])
print_to_err(type(self).__name__, "run step %d finished" % i)
print_to_err(type(self).__name__, "trainer run finished")
print_to_err(type(self).__name__, "dist losses: {}".format(out_losses))
if six.PY2:
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
def run_use_fleet_api_trainer(self, args):
assert args.update_method == "nccl2" or "bkcl"
......@@ -630,6 +700,7 @@ def runtime_main(test_class):
parser.add_argument('--use_hallreduce', action='store_true')
parser.add_argument('--use_pipeline', action='store_true')
parser.add_argument('--use_fleet_api', action='store_true')
parser.add_argument('--use_fleet_api_20', action='store_true')
parser.add_argument('--use_local_sgd', action='store_true')
parser.add_argument('--ut4grad_allreduce', action='store_true')
parser.add_argument(
......@@ -671,6 +742,8 @@ def runtime_main(test_class):
model.run_pserver(args)
elif args.use_fleet_api:
model.run_use_fleet_api_trainer(args)
elif args.use_fleet_api_20:
model.run_use_fleet_api_20_trainer(args)
elif args.use_pipeline:
model.run_pipeline_trainer(args)
else:
......@@ -734,6 +807,7 @@ class TestDistBase(unittest.TestCase):
self._nccl_comm_num = 1
self._enable_backward_deps = False
self._use_fleet_api = False
self._use_fleet_api_20 = False
self._use_local_sgd = False
self._ut4grad_allreduce = False
self._use_hallreduce = False
......@@ -1060,7 +1134,7 @@ class TestDistBase(unittest.TestCase):
tr_cmd += " --fuse_all_reduce {}".format(self._fuse_all_reduce)
if self._use_fleet_api:
tr_cmd += " --use_fleet_api"
tr_cmd += " --use_fleet_api_20" if self._use_fleet_api_20 else " --use_fleet_api"
if self._use_local_sgd:
tr_cmd += " --use_local_sgd"
if self._ut4grad_allreduce:
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test_dist_base import TestDistBase
import paddle
import os
paddle.enable_static()
flag_name = os.path.splitext(__file__)[0]
class TestFleetMetaOptimizerPrecision(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
self._nccl2_reduce_layer = True
self._use_fleet_api = True
self._use_fleet_api_20 = True
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"dist_fleet_raw_program_optimizer.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册