未验证 提交 e305771e 编写于 作者: K kangguangli 提交者: GitHub

[with_data_parallel][part1] remove with_data_parallel in unit test (#50351)

* process unit test matched test_p*

* fix ci bug

* fix codestyle

* remove all tests about pe and restore some irrelated tests

* delete test_parallel_executor_test_while_train.py
上级 842050f2
......@@ -427,7 +427,6 @@ list(REMOVE_ITEM TEST_OPS test_fetch_lod_tensor_array)
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_profiler)
list(REMOVE_ITEM TEST_OPS test_data_norm_op)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_transformer)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_transformer_auto_growth)
list(REMOVE_ITEM TEST_OPS test_bilinear_interp_op)
......@@ -828,10 +827,6 @@ if(NOT WIN32)
FLAGS_cudnn_deterministic=1)
py_test_modules(test_ir_memory_optimize_transformer MODULES
test_ir_memory_optimize_transformer)
# FIXME(zcd): temporally disable test_parallel_executor_fetch_feed in Windows CI because of the random failure.
py_test_modules(test_parallel_executor_fetch_feed MODULES
test_parallel_executor_fetch_feed)
set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 450)
endif()
add_subdirectory(sequence)
......@@ -889,22 +884,11 @@ add_subdirectory(ir)
add_subdirectory(standalone_executor)
if(WITH_TESTING)
set_property(TEST test_parallel_executor_mnist
PROPERTY ENVIRONMENT GLOG_vmodule=all_reduce_deps_pass=10)
set_property(TEST test_parallel_executor_fix_op_run_order
PROPERTY ENVIRONMENT GLOG_vmodule=fix_op_run_order_pass=10)
endif()
set_tests_properties(
test_parallel_executor_test_while_train
test_parallel_executor_mnist
test_parallel_executor_feed_persistable_var
test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass
test_data_norm_op
test_dataloader_keep_order
test_dataloader_unkeep_order
test_parallel_executor_inference_feed_partial_data
test_parallel_ssa_graph_inference_feed_partial_data
test_fetch_unmerged
test_buffer_shared_memory_reuse_pass
......@@ -918,7 +902,6 @@ set_tests_properties(
test_distributed_fused_lamb_op_with_clip
test_distributed_fused_lamb_op_without_clip
test_distributed_fused_lamb_op_with_gradient_merge
test_parallel_executor_fetch_isolated_var
PROPERTIES LABELS "RUN_TYPE=DIST")
if(NOT WIN32 AND NOT APPLE)
......@@ -1068,7 +1051,6 @@ set_tests_properties(test_weight_decay PROPERTIES TIMEOUT 120)
set_tests_properties(test_imperative_ptb_rnn_sorted_gradient PROPERTIES TIMEOUT
120)
set_tests_properties(test_crop_tensor_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_executor_mnist PROPERTIES TIMEOUT 120)
set_tests_properties(test_imperative_ptb_rnn PROPERTIES TIMEOUT 120)
set_tests_properties(test_imperative_save_load_v2 PROPERTIES TIMEOUT 120)
set_tests_properties(test_conv2d_transpose_op PROPERTIES TIMEOUT 120)
......@@ -1138,8 +1120,6 @@ set_tests_properties(test_matmul_v2_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_slice_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_strided_slice_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_translated_layer PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_executor_inference_feed_partial_data
PROPERTIES TIMEOUT 120)
set_tests_properties(test_pad3d_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_dataloader_keep_order PROPERTIES TIMEOUT 120)
set_tests_properties(test_mean_op PROPERTIES TIMEOUT 120)
......@@ -1244,8 +1224,6 @@ if(WITH_CINN AND WITH_TESTING)
endif()
# ExecutionStrategy is deprecated in standalone executor
set_tests_properties(test_parallel_executor_dry_run
PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0")
set_tests_properties(test_parallel_executor_drop_scope
PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0")
......
......@@ -18,17 +18,5 @@ import paddle.fluid as fluid
fluid.core._set_eager_deletion_mode(0.0, 1.0, True)
# FIXME(zjl): It seems that this unittest fails randomly
# when comparing all reduce last loss and reduce last loss
# e.g.: AssertionError: 1.0357145 != 1.0673475 within 0.01 delta
# Disable it temporarily.
'''
from test_parallel_executor_mnist import TestMNIST
class EagerDeletionTestMNIST(TestMNIST):
pass
'''
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import unittest
import paddle
import paddle.fluid as fluid
from paddle.fluid import compiler
os.environ['CPU_NUM'] = str(4)
class TestBase(unittest.TestCase):
def main(
self,
network_func,
iter=10,
iter_per_pe=10,
use_gpu=True,
use_experimental_executor=False,
):
if use_gpu and not fluid.core.is_compiled_with_cuda():
logging.warning(
"Paddle is not compiled with CUDA, skip GPU unittests"
)
return
main_prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(main_prog, startup_prog):
with fluid.scope_guard(scope):
loss = network_func()
exe = fluid.Executor(
fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()
)
exe.run(startup_prog)
exe_strategy = fluid.ExecutionStrategy()
exe_strategy._dry_run = True
exe_strategy.use_experimental_executor = (
use_experimental_executor
)
train_cp = compiler.CompiledProgram(
main_prog
).with_data_parallel(
loss_name=loss.name, exec_strategy=exe_strategy
)
for _ in range(iter):
for _ in range(iter_per_pe):
exe.run(train_cp)
class TestMNISTDryRun(TestBase):
def test_mnist_dry_run(self):
for use_gpu in (False, True):
for use_experimental_executor in (False, True):
self.main(
network_func=TestMNISTDryRun.network_func,
use_gpu=use_gpu,
use_experimental_executor=use_experimental_executor,
)
@staticmethod
def network_func():
img = paddle.static.data(name='img', shape=[-1, 784], dtype='float32')
label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64')
hidden = img
for _ in range(10):
hidden = paddle.static.nn.fc(x=img, size=200, activation='tanh')
prediction = paddle.static.nn.fc(
x=hidden, size=10, activation='softmax'
)
loss = paddle.nn.functional.cross_entropy(
input=prediction, label=label, reduction='none', use_softmax=False
)
avg_loss = paddle.mean(loss)
fluid.optimizer.Adam().minimize(avg_loss)
return avg_loss
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from functools import partial
import numpy
from simple_nets import init_data, simple_fc_net
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
class TestFeedPersistableVar(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
batch_size = 4
cls.img, cls.label = init_data(
batch_size, img_shape=[784], label_range=9
)
cls.feed_dict = {
'image': cls.img,
'label': cls.label,
'learning_rate': numpy.array([1.0]).astype("float32"),
}
def optimizer(self):
learning_rate = paddle.static.create_global_var(
name="learning_rate",
shape=[1],
value=1.0,
dtype='float32',
persistable=True,
)
optimizer = fluid.optimizer.SGD(learning_rate=learning_rate)
return optimizer
def check_feed_persistable_var(self, feed_dict, use_cuda=False):
if use_cuda and not core.is_compiled_with_cuda():
return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = simple_fc_net()
optimizer = self.optimizer()
optimizer.minimize(loss)
exe.run(program=startup)
compiled_prog = fluid.compiler.CompiledProgram(
main
).with_data_parallel(loss_name=loss.name)
exe.run(program=compiled_prog, feed=feed_dict)
def test_feed_persistable_var(self):
self.check_feed_persistable_var(self.feed_dict)
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
self.feed_dict['learning_rate'] = numpy.array([1.0, 1.0]).astype(
"float32"
)
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
self.feed_dict['learning_rate'] = numpy.array([1.0, 1.0]).astype(
"float32"
)
run = partial(self.check_feed_persistable_var, self.feed_dict)
self.assertRaises(RuntimeError, run)
self.feed_dict['image'] = self.img[0, :]
self.feed_dict['label'] = self.label[0, :]
run = partial(self.check_feed_persistable_var, self.feed_dict)
self.assertRaises(RuntimeError, run)
if __name__ == '__main__':
paddle.enable_static()
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import os
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid import compiler
def Lenet(data, class_dim):
conv1 = paddle.static.nn.conv2d(data, 4, 5, 1, act=None)
bn1 = paddle.static.nn.batch_norm(conv1, act='relu')
pool1 = paddle.nn.functional.max_pool2d(bn1, 2, 2)
conv2 = paddle.static.nn.conv2d(pool1, 16, 5, 1, act=None)
bn2 = paddle.static.nn.batch_norm(conv2, act='relu')
pool2 = paddle.nn.functional.max_pool2d(bn2, 2, 2)
fc1 = paddle.static.nn.fc(pool2, size=50, activation='relu')
fc2 = paddle.static.nn.fc(fc1, size=class_dim, activation='softmax')
return fc2
class TestFetchAndFeed(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
def parallel_exe(
self,
use_cuda,
run_parallel_exe,
use_faster_executor=False,
num_threads=4,
seed=1,
):
main_program = fluid.Program()
startup = fluid.Program()
startup.random_seed = seed
with fluid.program_guard(main_program, startup):
data = paddle.static.data(
name='image', shape=[-1, 3, 224, 224], dtype='float32'
)
label = paddle.static.data(
name='label', shape=[-1, 1], dtype='int64'
)
out = Lenet(data, class_dim=102)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label, reduction='none', use_softmax=False
)
loss = paddle.mean(loss)
opt = fluid.optimizer.Momentum(
learning_rate=0.1,
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4),
)
opt.minimize(loss)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
# FIXME force disable enable_inplace and memory_optimize to pass the unittest
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.memory_optimize = False
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.use_experimental_executor = use_faster_executor
exec_strategy.num_threads = num_threads
train_cp = compiler.CompiledProgram(main_program).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
)
run_parallel_exe(train_cp, exe, use_cuda, data, label, loss)
def run_parallel_exe_with_fetch(
self, compiled_program, exe, use_cuda, data, label, loss
):
def get_data(batch_size=8):
np.random.seed(5)
while True:
img = np.random.random(size=[batch_size, 3, 224, 224]).astype(
np.float32
)
l = (np.random.random(size=[batch_size, 1]) * 10).astype(
np.int64
)
yield img, l
fetch_list = []
all_vars = compiled_program._program.global_block().vars
for k, v in all_vars.items():
if (
('tmp' not in k)
and (k[0] != '_' or v.persistable)
and v.type == core.VarDesc.VarType.LOD_TENSOR
):
fetch_list.append(k)
for batch_id, img_label in enumerate(get_data()):
img, l = img_label
train_inputs = {data.name: img, label.name: l}
ret = exe.run(
compiled_program,
fetch_list=fetch_list,
feed=train_inputs,
return_numpy=True,
)
for i in range(len(fetch_list)):
assert not math.isnan(np.sum(ret[i])) and not math.isinf(
np.sum(ret[i])
)
if batch_id == 2:
break
def run_parallel_exe_with_feed(
self, compiled_program, exe, use_cuda, data, label, loss
):
def get_data(batch_size=8):
np.random.seed(5)
while True:
train_data = []
for _ in range(batch_size):
img = np.random.random(size=[1, 3, 224, 224]).astype(
np.float32
)
label = (np.random.random(size=[1, 1]) * 10).astype(
np.int64
)
train_data.append([img, label])
yield train_data
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
reader = feeder.decorate_reader(get_data, multi_devices=True)
for batch_id, data in enumerate(reader()):
loss_np = exe.run(
compiled_program, feed=data, fetch_list=[loss.name]
)[0]
print(batch_id, loss_np)
if batch_id == 2:
break
def check_executor(self, use_faster_executor=False, num_threads=4):
if core.is_compiled_with_cuda():
self.parallel_exe(
use_cuda=True,
run_parallel_exe=self.run_parallel_exe_with_fetch,
use_faster_executor=use_faster_executor,
num_threads=num_threads,
)
self.parallel_exe(
use_cuda=False,
run_parallel_exe=self.run_parallel_exe_with_fetch,
use_faster_executor=use_faster_executor,
num_threads=num_threads,
)
def test_fetch(self):
for use_faster_executor in {True, False}:
self.check_executor(
use_faster_executor=use_faster_executor, num_threads=4
)
self.check_executor(
use_faster_executor=use_faster_executor, num_threads=1
)
def test_feed(self):
if core.is_compiled_with_cuda():
self.parallel_exe(
use_cuda=True, run_parallel_exe=self.run_parallel_exe_with_feed
)
self.parallel_exe(
use_cuda=False, run_parallel_exe=self.run_parallel_exe_with_feed
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
def enable_parallel_ssa_executor(enabled=True):
if fluid.is_compiled_with_cuda():
fluid.core.globals()['FLAGS_enable_parallel_graph'] = enabled
class TestParallelExecutorFetchIsolatedVarBase(unittest.TestCase):
def build_network(self, is_training):
x = fluid.data(name='x', shape=[-1, 10], dtype='float32')
y = fluid.data(name='y', shape=[-1, 10], dtype='float32')
fc = paddle.static.nn.fc(x, size=30, bias_attr=False)
loss = paddle.mean(fc)
if is_training:
adam = fluid.optimizer.Adam(learning_rate=1e-3)
adam.minimize(loss)
return loss, y
def exec_strategy(self, use_experimental_executor):
strategy = fluid.ExecutionStrategy()
strategy.use_experimental_executor = use_experimental_executor
return strategy
def places(self, use_gpu, dev_cnt):
if use_gpu:
return fluid.cuda_places(list(range(dev_cnt)))
else:
return fluid.cpu_places(dev_cnt)
def test_main(self):
for use_gpu in [False, True]:
for dev_cnt in [1, 2]:
for is_training in [False, True]:
for use_experimental_executor in [False, True]:
for use_parallel_ssa_executor in [False, True]:
func = lambda: self.run_impl(
use_gpu,
dev_cnt,
is_training,
use_experimental_executor,
use_parallel_ssa_executor,
)
self.run_func_with_guard(func)
def run_impl(
self,
use_gpu,
dev_cnt,
is_training,
use_experimental_executor,
use_parallel_ssa_executor,
):
paddle.enable_static()
enable_parallel_ssa_executor(use_parallel_ssa_executor)
if fluid.is_compiled_with_cuda():
if (
fluid.core.globals()['FLAGS_enable_parallel_graph']
and not use_gpu
):
return
# windows has only 1 GPU
if use_gpu and dev_cnt > 1 and os.name == "nt":
return
else:
if use_gpu:
return
loss, isolated_var = self.build_network(is_training)
loss_name = loss.name if is_training else None
places = self.places(use_gpu, dev_cnt)
exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program())
prog = fluid.CompiledProgram(
fluid.default_main_program()
).with_data_parallel(
loss_name=loss_name,
exec_strategy=self.exec_strategy(use_experimental_executor),
places=places,
)
BATCH_SIZE = 8 * dev_cnt
for _ in range(10):
x_np = np.random.random(size=[BATCH_SIZE, 10]).astype('float32')
y_np = np.random.random(size=[BATCH_SIZE, 10]).astype('float32')
_, y_np_fetch = exe.run(
prog,
feed={'x': x_np, 'y': y_np},
fetch_list=[loss, isolated_var],
)
np.testing.assert_array_equal(y_np, y_np_fetch)
enable_parallel_ssa_executor(False)
def run_func_with_guard(self, func):
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.unique_name.guard():
with fluid.scope_guard(fluid.Scope()):
func()
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.nn import CrossEntropyLoss
from paddle.vision.models import resnet18
class TestFixOpRunOrder(unittest.TestCase):
def setUp(self):
paddle.enable_static()
paddle.seed(1)
paddle.framework.random._manual_program_seed(1)
if paddle.is_compiled_with_cuda():
fluid.set_flags({'FLAGS_cudnn_deterministic': 1})
def get_place(self):
return (
paddle.CUDAPlace(0)
if paddle.is_compiled_with_cuda()
else paddle.CPUPlace()
)
def get_feed(self):
batch_size = 4
image = np.random.random([batch_size, 3, 224, 224]).astype('float32')
label = np.random.randint(0, 1000, [batch_size, 1]).astype('int64')
return {"image": image, "label": label}
def create_model(self, fix_op_run_order):
main_prog = paddle.static.Program()
startup_prog = paddle.static.Program()
scope = paddle.static.Scope()
with paddle.static.program_guard(main_prog, startup_prog):
image = paddle.static.data(
name="image", shape=[None, 3, 224, 224], dtype="float32"
)
label = paddle.static.data(
name="label", shape=[None, 1], dtype="int64"
)
model = resnet18()
pred = model(image)
loss_fn = CrossEntropyLoss()
loss = loss_fn(pred, label)
optimizer = paddle.optimizer.SGD(learning_rate=1e-3)
optimizer.minimize(loss)
build_strategy = paddle.static.BuildStrategy()
build_strategy.fix_op_run_order = fix_op_run_order
build_strategy.fuse_bn_act_ops = True
build_strategy.fuse_bn_add_act_ops = True
main_prog = paddle.static.CompiledProgram(main_prog).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
places=[self.get_place()],
)
exe = paddle.static.Executor(self.get_place())
with paddle.static.scope_guard(scope):
exe.run(startup_prog)
return main_prog, scope, loss
def run_and_fetch_loss(self, main_prog, scope, loss, feed):
with paddle.static.scope_guard(scope):
exe = paddle.static.Executor(self.get_place())
loss_value = exe.run(main_prog, feed=feed, fetch_list=[loss])[0]
return loss_value
def test_main(self):
if not paddle.is_compiled_with_cuda():
return
main1, scope1, loss1 = self.create_model(True)
main2, scope2, loss2 = self.create_model(False)
for i in range(10):
feed = self.get_feed()
loss_val1 = self.run_and_fetch_loss(main1, scope1, loss1, feed)
loss_val2 = self.run_and_fetch_loss(main2, scope2, loss2, feed)
self.assertEqual(loss_val1, loss_val2)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.nn.functional as F
class TestInferencePartialFeed(unittest.TestCase):
def setUp(self):
self.iterations = 10
self.size = 10
def run_network(self, places, use_split, has_persistable):
startup_prog = fluid.Program()
main_prog = fluid.Program()
with fluid.program_guard(main_prog, startup_prog):
x = fluid.data(name='x', shape=[None, self.size], dtype='float32')
y = fluid.data(name='y', shape=[None, self.size], dtype='float32')
if has_persistable:
lr = fluid.data(name='lr', shape=[1], dtype='float32')
lr.persistable = True
else:
lr = fluid.data(name='lr', shape=[None], dtype='float32')
relu_x = F.relu(x)
relu_y = F.relu(y)
relu_lr = F.relu(lr)
exe = fluid.Executor(places[0])
exe.run(startup_prog)
prog = fluid.CompiledProgram(main_prog).with_data_parallel(
places=places
)
gen_random = lambda shape: np.random.uniform(
low=-1.0, high=1.0, size=shape
).astype('float32')
assert_result = lambda feed, result: np.testing.assert_array_equal(
np.maximum(0, feed), result
)
def assert_merged_unmerged(merged, unmerged):
unmerged = np.concatenate(unmerged, axis=0)
np.testing.assert_array_equal(merged, unmerged)
def feed_split_test():
for place_num in range(1, len(places) * 3):
x_np = gen_random([place_num, self.size])
y_np = gen_random([place_num, self.size])
if not lr.persistable or place_num <= len(places):
lr_np = gen_random([place_num])
else:
lr_np = gen_random([1])
feed = {x.name: x_np, y.name: y_np, lr.name: lr_np}
fetch_list = [relu_x, relu_y, relu_lr]
relu_x_np, relu_y_np, relu_lr_np = exe.run(
prog, feed=feed, fetch_list=fetch_list, return_merged=True
)
(
relu_x_np_unmerged,
relu_y_np_unmerged,
relu_lr_np_unmerged,
) = exe.run(
prog, feed=feed, fetch_list=fetch_list, return_merged=False
)
assert_merged_unmerged(relu_x_np, relu_x_np_unmerged)
assert_merged_unmerged(relu_y_np, relu_y_np_unmerged)
assert_merged_unmerged(relu_lr_np, relu_lr_np_unmerged)
assert_result(x_np, relu_x_np)
assert_result(y_np, relu_y_np)
if not lr.persistable or place_num <= len(places):
assert_result(lr_np, relu_lr_np)
else:
expected_relu_lr_np = max(lr_np[0], 0)
self.assertTrue(np.all(expected_relu_lr_np == relu_lr_np))
def feed_list_test():
for place_num in range(1, len(places) + 1):
x_np_list = []
y_np_list = []
lr_np_list = []
feed_list = []
for _ in range(place_num):
x_np = gen_random([1, self.size])
y_np = gen_random([1, self.size])
lr_np = gen_random([1])
x_np_list.append(x_np)
y_np_list.append(y_np)
lr_np_list.append(lr_np)
feed_list.append(
{x.name: x_np, y.name: y_np, lr.name: lr_np}
)
fetch_list = [relu_x, relu_y, relu_lr]
relu_x_np, relu_y_np, relu_lr_np = exe.run(
prog,
feed=feed_list,
fetch_list=fetch_list,
return_merged=True,
)
(
relu_x_np_unmerged,
relu_y_np_unmerged,
relu_lr_np_unmerged,
) = exe.run(
prog,
feed=feed_list,
fetch_list=fetch_list,
return_merged=False,
)
assert_merged_unmerged(relu_x_np, relu_x_np_unmerged)
assert_merged_unmerged(relu_y_np, relu_y_np_unmerged)
assert_merged_unmerged(relu_lr_np, relu_lr_np_unmerged)
x_np = np.concatenate(x_np_list)
y_np = np.concatenate(y_np_list)
lr_np = np.concatenate(lr_np_list)
assert_result(x_np, relu_x_np)
assert_result(y_np, relu_y_np)
assert_result(lr_np, relu_lr_np)
for _ in range(self.iterations):
if use_split:
feed_split_test()
else:
feed_list_test()
def test_main(self):
places = [fluid.cpu_places(4)]
if fluid.is_compiled_with_cuda():
places.append(fluid.cuda_places())
for p in places:
for has_persistable in [False, True]:
for use_split in [False, True]:
self.run_network(
p, use_split=use_split, has_persistable=has_persistable
)
class TestInferencePartialFeedUsingDataLoader(unittest.TestCase):
def setUp(self):
self.epoch_num = 3
self.batch_num = 101 # a prime number
self.batch_size = 32
def create_reader(self):
def __impl__():
for _ in range(self.batch_num):
yield np.random.random([self.batch_size, 1]).astype('float32'),
return __impl__
def run_network(self, iterable, use_cuda, drop_last):
x = fluid.data(shape=[None, 1], name='x', dtype='float32')
places = fluid.cuda_places() if use_cuda else fluid.cpu_places(4)
loader = fluid.io.DataLoader.from_generator(
feed_list=[x], capacity=16, iterable=iterable, drop_last=drop_last
)
y = paddle.static.nn.fc(x, size=10)
loss = paddle.mean(y)
exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program())
prog = fluid.CompiledProgram(
fluid.default_main_program()
).with_data_parallel(places=places, loss_name=loss.name)
loader.set_batch_generator(
self.create_reader(), places=places if iterable else None
)
for _ in range(self.epoch_num):
actual_batch_num = 0
if loader.iterable:
for feed_data in loader():
(x_data,) = exe.run(prog, feed=feed_data, fetch_list=[x])
self.assertEqual(x_data.shape[0] % self.batch_size, 0)
self.assertTrue(x_data.shape[0] != 0)
actual_batch_num += int(x_data.shape[0] / self.batch_size)
else:
loader.start()
try:
while True:
(x_data,) = exe.run(prog, fetch_list=[x])
self.assertEqual(x_data.shape[0] % self.batch_size, 0)
self.assertTrue(x_data.shape[0] != 0)
actual_batch_num += int(
x_data.shape[0] / self.batch_size
)
except fluid.core.EOFException:
loader.reset()
if not drop_last or len(places) == 1:
self.assertEqual(self.batch_num, actual_batch_num)
else:
self.assertGreater(self.batch_num, actual_batch_num)
def test_main(self):
use_cuda_list = (
[False, True] if fluid.is_compiled_with_cuda() else [False]
)
iterable_list = [False, True]
drop_last_list = [False, True]
for iterable in iterable_list:
for use_cuda in use_cuda_list:
for drop_last in drop_last_list:
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.scope_guard(fluid.Scope()):
self.run_network(iterable, use_cuda, drop_last)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import numpy as np
from parallel_executor_test_base import DeviceType, TestParallelExecutorBase
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
def simple_fc_net(use_feed):
img = paddle.static.data(name='image', shape=[-1, 784], dtype='float32')
label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64')
hidden = img
for _ in range(4):
hidden = paddle.static.nn.fc(
hidden,
size=200,
activation='tanh',
bias_attr=fluid.ParamAttr(
initializer=paddle.nn.initializer.Constant(value=1.0)
),
)
prediction = paddle.static.nn.fc(hidden, size=10, activation='softmax')
loss = paddle.nn.functional.cross_entropy(
input=prediction, label=label, reduction='none', use_softmax=False
)
loss = paddle.mean(loss)
return loss
def fc_with_batchnorm(use_feed):
img = paddle.static.data(name='image', shape=[-1, 784], dtype='float32')
label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64')
hidden = img
for _ in range(1):
with fluid.name_scope("hidden"):
hidden = paddle.static.nn.fc(
hidden,
size=200,
activation='tanh',
bias_attr=fluid.ParamAttr(
initializer=paddle.nn.initializer.Constant(value=1.0)
),
)
hidden = paddle.static.nn.batch_norm(input=hidden)
with fluid.name_scope("fc_layer"):
prediction = paddle.static.nn.fc(hidden, size=10, activation='softmax')
with fluid.name_scope("loss"):
loss = paddle.nn.functional.cross_entropy(
input=prediction, label=label, reduction='none', use_softmax=False
)
loss = paddle.mean(loss)
return loss
def init_data():
np.random.seed(5)
img = np.random.random(size=[32, 784]).astype(np.float32)
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
class TestMNIST(TestParallelExecutorBase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
def _compare_reduce_and_allreduce(
self, model, use_device, delta1=1e-6, delta2=1e-4
):
if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda():
return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return
img, label = init_data()
(
all_reduce_first_loss,
all_reduce_last_loss,
_,
) = self.check_network_convergence(
model,
feed_dict={"image": img, "label": label},
use_device=use_device,
use_reduce=False,
)
reduce_first_loss, reduce_last_loss, _ = self.check_network_convergence(
model,
feed_dict={"image": img, "label": label},
use_device=use_device,
use_reduce=True,
)
for loss in zip(all_reduce_first_loss, reduce_first_loss):
self.assertAlmostEqual(loss[0], loss[1], delta=delta1)
for loss in zip(all_reduce_last_loss, reduce_last_loss):
self.assertAlmostEqual(loss[0], loss[1], delta=delta2)
# simple_fc
def check_simple_fc_convergence(self, use_device, use_reduce=False):
if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda():
return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return
img, label = init_data()
self.check_network_convergence(
simple_fc_net,
feed_dict={"image": img, "label": label},
use_device=use_device,
use_reduce=use_reduce,
)
def test_simple_fc(self):
# use_device
self.check_simple_fc_convergence(DeviceType.CUDA)
self.check_simple_fc_convergence(DeviceType.CPU)
self.check_simple_fc_convergence(DeviceType.XPU)
def test_simple_fc_with_new_strategy(self):
# use_device, use_reduce
# NOTE: the computation result of nccl_reduce is non-deterministic,
# related issue: https://github.com/NVIDIA/nccl/issues/157
self._compare_reduce_and_allreduce(
simple_fc_net, DeviceType.CUDA, 1e-5, 1e-2
)
self._compare_reduce_and_allreduce(
simple_fc_net, DeviceType.CPU, 1e-5, 1e-2
)
def check_simple_fc_parallel_accuracy(self, use_device):
if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda():
return
img, label = init_data()
single_first_loss, single_last_loss, _ = self.check_network_convergence(
method=simple_fc_net,
feed_dict={"image": img, "label": label},
use_device=use_device,
use_parallel_executor=False,
)
(
parallel_first_loss,
parallel_last_loss,
_,
) = self.check_network_convergence(
method=simple_fc_net,
feed_dict={"image": img, "label": label},
use_device=use_device,
use_parallel_executor=True,
)
self.assertAlmostEqual(
np.mean(parallel_first_loss),
single_first_loss,
delta=1e-6,
)
self.assertAlmostEqual(
np.mean(parallel_last_loss), single_last_loss, delta=1e-6
)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(DeviceType.CUDA)
self.check_simple_fc_parallel_accuracy(DeviceType.CPU)
def check_batchnorm_fc_convergence(self, use_device, use_fast_executor):
if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda():
return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return
img, label = init_data()
self.check_network_convergence(
fc_with_batchnorm,
feed_dict={"image": img, "label": label},
use_device=use_device,
use_fast_executor=use_fast_executor,
)
def test_batchnorm_fc(self):
for use_device in (DeviceType.CPU, DeviceType.CUDA):
for use_fast_executor in (False, True):
self.check_batchnorm_fc_convergence(
use_device, use_fast_executor
)
def test_batchnorm_fc_with_new_strategy(self):
# NOTE: the computation result of nccl_reduce is non-deterministic,
# related issue: https://github.com/NVIDIA/nccl/issues/157
self._compare_reduce_and_allreduce(
fc_with_batchnorm, DeviceType.CUDA, 1e-5, 1e-2
)
self._compare_reduce_and_allreduce(
fc_with_batchnorm, DeviceType.CPU, 1e-5, 1e-2
)
class TestMNISTNoReduce(unittest.TestCase):
def run_program(self, device_type):
if device_type == DeviceType.CUDA:
if not paddle.is_compiled_with_cuda():
return
places = paddle.static.cuda_places()
else:
self.assertEqual(device_type, DeviceType.CPU)
places = paddle.static.cpu_places(4)
paddle.seed(10)
with paddle.fluid.unique_name.guard():
main = paddle.static.Program()
startup = paddle.static.Program()
with paddle.static.program_guard(main, startup):
loss = simple_fc_net(use_feed=True)
optimizer = paddle.optimizer.SGD(learning_rate=0.0)
optimizer.minimize(loss)
grads = [p.name + '@GRAD' for p in main.all_parameters()]
no_reduce = paddle.static.BuildStrategy.ReduceStrategy._NoReduce
build_strategy = paddle.static.BuildStrategy()
build_strategy.reduce_strategy = no_reduce
main_multi_place = paddle.static.CompiledProgram(
main
).with_data_parallel(
loss_name=loss.name, build_strategy=build_strategy, places=places
)
build_strategy = paddle.static.BuildStrategy()
build_strategy.reduce_strategy = no_reduce
main_single_place = paddle.static.CompiledProgram(
main.clone()
).with_data_parallel(
loss_name=loss.name, build_strategy=build_strategy, places=places[0]
)
image, label = init_data()
feed = {'image': image, 'label': label}
exe = paddle.static.Executor(places[0])
scope = paddle.static.Scope()
with paddle.static.scope_guard(scope):
exe.run(startup)
grads_multi_place = exe.run(
main_multi_place, feed=feed, fetch_list=[grads]
)
feeds = self.split_feed(feed, len(places))
grads_single_place = [list() for _ in range(len(grads))]
for f in feeds:
gs = exe.run(main_single_place, feed=f, fetch_list=[grads])
for i, g in enumerate(gs):
grads_single_place[i].append(g)
for i in range(len(grads)):
grads_single_place[i] = np.concatenate(
grads_single_place[i], axis=0
) / len(places)
self.assertEqual(len(grads_multi_place), len(grads_single_place))
for g1, g2 in zip(grads_multi_place, grads_single_place):
np.testing.assert_allclose(g1, g2, rtol=1e-05)
def split_feed(self, feed, n):
image = feed['image']
label = feed['label']
self.assertEqual(image.shape[0] % n, 0)
self.assertEqual(label.shape[0] % n, 0)
images = np.split(image, n)
labels = np.split(label, n)
return [{'image': images[i], 'label': labels[i]} for i in range(n)]
def test_main(self):
self.run_program(DeviceType.CUDA)
self.run_program(DeviceType.CPU)
if __name__ == '__main__':
paddle.enable_static()
unittest.main()
......@@ -103,7 +103,7 @@ def train(dot_save_dir, prefix, seed=1234):
build_strategy.debug_graphviz_path = os.path.join(dot_save_dir, prefix)
compiled_program = paddle.static.CompiledProgram(
main_program, build_strategy
).with_data_parallel(loss_name=loss.name)
)
iters = 100
feed = rand_data(img.name, label.name, iters)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import os
import sys
import unittest
import numpy as np
from simple_nets import simple_fc_net
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid import compiler
class ParallelExecutorTestingDuringTraining(unittest.TestCase):
def check_network_convergence(self, use_cuda, build_strategy=None):
os.environ['CPU_NUM'] = str(4)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = simple_fc_net()
test_program = main.clone(for_test=True)
opt = fluid.optimizer.SGD(learning_rate=0.001)
opt.minimize(loss)
batch_size = 32
image = np.random.normal(size=(batch_size, 784)).astype('float32')
label = np.random.randint(0, 10, (batch_size, 1), dtype="int64")
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
feed_dict = {'image': image, 'label': label}
train_cp = compiler.CompiledProgram(main).with_data_parallel(
loss_name=loss.name, build_strategy=build_strategy
)
test_cp = compiler.CompiledProgram(test_program).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
share_vars_from=train_cp,
)
for i in range(5):
exe.run(train_cp, feed=feed_dict, fetch_list=[loss.name])
(test_loss,) = exe.run(
test_cp, feed=feed_dict, fetch_list=[loss.name]
)
(train_loss,) = exe.run(
train_cp, feed=feed_dict, fetch_list=[loss.name]
)
avg_test_loss_val = np.array(test_loss).mean()
if math.isnan(float(avg_test_loss_val)):
sys.exit("got NaN loss, testing failed.")
avg_train_loss_val = np.array(train_loss).mean()
if math.isnan(float(avg_train_loss_val)):
sys.exit("got NaN loss, training failed.")
np.testing.assert_allclose(
train_loss, test_loss, rtol=1e-05, atol=0.01
)
def test_parallel_testing(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = (
fluid.BuildStrategy.ReduceStrategy.AllReduce
)
if core.is_compiled_with_cuda():
self.check_network_convergence(
use_cuda=True, build_strategy=build_strategy
)
self.check_network_convergence(
use_cuda=False, build_strategy=build_strategy
)
def test_parallel_testing_with_new_strategy_gpu(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = (
fluid.BuildStrategy.ReduceStrategy.Reduce
)
if core.is_compiled_with_cuda():
self.check_network_convergence(
use_cuda=True, build_strategy=build_strategy
)
def test_parallel_testing_with_new_strategy_cpu(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = (
fluid.BuildStrategy.ReduceStrategy.Reduce
)
self.check_network_convergence(
use_cuda=False, build_strategy=build_strategy
)
if __name__ == '__main__':
unittest.main()
......@@ -47,13 +47,11 @@ class TestPassBuilder(unittest.TestCase):
exe.run(startup)
feed_dict = {'image': image, 'label': label}
train_cp = compiler.CompiledProgram(main).with_data_parallel(
loss_name=loss.name, build_strategy=build_strategy
train_cp = compiler.CompiledProgram(
main, build_strategy=build_strategy
)
test_cp = compiler.CompiledProgram(test_program).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
share_vars_from=train_cp,
test_cp = compiler.CompiledProgram(
test_program, build_strategy=build_strategy
)
for i in range(5):
......
......@@ -131,9 +131,7 @@ class TestPrintOpBackward(unittest.TestCase):
exe = paddle.static.Executor(place)
exe.run(startup)
binary = paddle.static.CompiledProgram(main).with_data_parallel(
loss_name=loss.name
)
binary = paddle.static.CompiledProgram(main)
img, label = init_data()
feed_dict = {"image": img, "label": label}
......
......@@ -80,13 +80,7 @@ class TestProfiler(unittest.TestCase):
if compile_program:
# TODO(luotao): profiler tool may have bug with multi-thread parallel executor.
# https://github.com/PaddlePaddle/Paddle/pull/25200#issuecomment-650483092
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 1
train_program = fluid.compiler.CompiledProgram(
main_program
).with_data_parallel(
loss_name=avg_cost.name, exec_strategy=exec_strategy
)
train_program = fluid.compiler.CompiledProgram(main_program)
else:
train_program = main_program
return train_program, startup_program, avg_cost, batch_size, batch_acc
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import contextlib
import os
import unittest
import numpy as np
......@@ -334,11 +333,7 @@ class TestExecutorRunAutoPrune(unittest.TestCase):
sgd_optimizer.minimize(loss1)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup_program)
compiled_prog = fluid.CompiledProgram(
program
).with_data_parallel(
loss_name=loss1.name, places=fluid.CPUPlace()
)
compiled_prog = fluid.CompiledProgram(program)
weight_init = np.array(
scope.find_var(w_param_attrs.name).get_tensor()
)
......@@ -543,11 +538,7 @@ class TestExecutorRunAutoPrune(unittest.TestCase):
label_np = np.random.randint(1, size=(10, 1)).astype(
'int64'
)
compiled_prog = fluid.CompiledProgram(
program
).with_data_parallel(
loss_name=loss1.name, places=fluid.CPUPlace()
)
compiled_prog = fluid.CompiledProgram(program)
for i in range(10):
res = exe.run(
compiled_prog,
......@@ -621,88 +612,6 @@ class TestExecutorRunAutoPrune(unittest.TestCase):
np.testing.assert_array_equal(weight_with_prune, weight_expected)
self.assertFalse(np.array_equal(weight_without_prune, weight_expected))
def test_prune_with_multi_devices(self):
'''
When training model with multi_devices, the pruned CompiledProgram should share same local scopes.
This test the correctness.
'''
exe = fluid.Executor(fluid.CPUPlace())
program = framework.Program()
startup_program = framework.Program()
scope = fluid.Scope()
os.environ['CPU_NUM'] = str(2)
# do not use_prune
with fluid.scope_guard(scope):
with fluid.program_guard(program, startup_program):
(
x1,
x2,
y1,
y2,
label,
loss1,
loss2,
w1_param_attrs,
w2_param_attrs,
) = self.net2()
adam_optimizer1 = fluid.optimizer.AdamOptimizer(
learning_rate=0.5
)
train1 = adam_optimizer1.minimize(loss1)
cloned_program = program.clone()
adam_optimizer2 = fluid.optimizer.AdamOptimizer(
learning_rate=0.5
)
train2 = adam_optimizer2.minimize(loss2)
exe.run(startup_program)
x_np = np.random.random(size=(10, 2)).astype('float32')
label_np = np.random.randint(1, size=(10, 1)).astype('int64')
compiled_prog1 = fluid.CompiledProgram(
program
).with_data_parallel(
loss_name=loss1.name, places=[fluid.CPUPlace()] * 2
)
compiled_prog2 = fluid.CompiledProgram(
program
).with_data_parallel(
loss_name=loss2.name, places=[fluid.CPUPlace()] * 2
)
for i in range(10):
if i % 2 == 1:
res = exe.run(
compiled_prog1,
feed=[
{'x1': x_np[0:5, :], 'label': label_np[0:5, :]},
{'x1': x_np[5:, :], 'label': label_np[5:, :]},
],
fetch_list=[loss1.name, train1],
use_prune=True,
)
else:
res = exe.run(
compiled_prog2,
feed={'x2': x_np, 'label': label_np},
fetch_list=[loss2.name, train2],
use_prune=True,
)
weight1 = np.array(
scope.find_var(w1_param_attrs.name).get_tensor()
)
# expected
scope = fluid.Scope()
with fluid.scope_guard(scope):
exe.run(startup_program)
for i in range(10):
if i % 2 == 1:
exe.run(
cloned_program,
feed={'x1': x_np, 'x2': x_np, 'label': label_np},
fetch_list=[loss1.name],
use_prune=False,
)
weight2 = np.array(scope.find_var(w1_param_attrs.name).get_tensor())
np.testing.assert_allclose(weight1, weight2, rtol=1e-05)
def test_prune_program_with_tupe_in_fetch_list(self):
'''
If there are multiple optimizers in the program, we can run specific one by
......
......@@ -202,7 +202,6 @@ def test_main(use_cuda, use_py_func_op, use_parallel_executor):
train_cp = compiler.CompiledProgram(
fluid.default_main_program()
)
train_cp = train_cp.with_data_parallel(loss_name=loss.name)
fetch_list = [loss.name]
else:
fetch_list = [loss]
......
......@@ -513,7 +513,6 @@ HIGH_PARALLEL_JOB_NEW = [
'test_new_group_api',
'test_dist_fleet_heter_base',
'test_collective_split_col_linear',
'test_parallel_executor_mnist',
'test_dist_fleet_ctr2',
'test_dist_fleet_heter_program',
'test_dist_fleet_ctr',
......@@ -928,7 +927,6 @@ FOURTH_HIGH_PARALLEL_JOB_NEW = [
'test_one_hot_op',
'test_adaptive_max_pool1d',
'test_label_smooth_op',
'test_parallel_executor_fetch_feed',
'test_cast',
'test_parallel_dygraph_sync_batch_norm',
'test_collect_fpn_proposals_op',
......@@ -1443,7 +1441,6 @@ FOURTH_HIGH_PARALLEL_JOB_NEW = [
'test_nan_inf',
'test_fuse_bn_add_act_pass',
'test_unpool_op',
'test_parallel_executor_dry_run',
'test_layer_norm_op_v2',
'test_embedding_id_stop_gradient',
'test_mkldnn_fc_act_fuse_pass',
......@@ -1510,11 +1507,7 @@ FOURTH_HIGH_PARALLEL_JOB_NEW = [
'test_bilateral_slice_op',
'test_inplace_abn_op',
'test_fetch_unmerged',
'test_parallel_executor_feed_persistable_var',
'test_parallel_executor_fetch_isolated_var',
'test_parallel_executor_inference_feed_partial_data',
'test_parallel_executor_seresnext_base_gpu',
'test_parallel_executor_test_while_train',
'test_parallel_executor_seresnext_with_fuse_all_reduce_gpu',
'test_parallel_ssa_graph_inference_feed_partial_data',
'test_parallel_executor_seresnext_with_reduce_gpu',
......@@ -2341,7 +2334,6 @@ TETRAD_PARALLEL_JOB = [
'test_trt_conv3d_op',
'test_parallel_executor_drop_scope',
'test_tensorrt_engine',
'test_parallel_executor_mnist',
'test_load_state_dict_from_old_format',
'test_fuse_elewise_add_act_pass',
'test_fetch_unmerged',
......@@ -2349,7 +2341,6 @@ TETRAD_PARALLEL_JOB = [
'test_standalone_controlflow',
'test_standalone_multiply_write',
'test_reshape_op',
'test_parallel_executor_fetch_isolated_var',
'test_inplace_abn_op',
'test_fused_transformer_encoder_layer',
'test_eager_deletion_while_op',
......@@ -2702,7 +2693,6 @@ TWO_PARALLEL_JOB = [
'test_sequence_expand',
'test_pool2d_bf16_mkldnn_op',
'test_bilinear_api',
'test_parallel_executor_inference_feed_partial_data',
'test_initializer_nn',
'test_modified_huber_loss_op',
'test_lookup_table_op',
......@@ -2961,7 +2951,6 @@ TWO_PARALLEL_JOB = [
'test_is_empty_op',
'test_py_reader_pin_memory',
'test_train_recognize_digits',
'test_parallel_executor_feed_persistable_var',
'test_update_loss_scaling_op',
'test_rnn_cell_api',
'test_imperative_load_static_param',
......@@ -3020,7 +3009,6 @@ TWO_PARALLEL_JOB = [
'test_profiler',
'test_ir_memory_optimize_pass',
'test_callback_reduce_lr_on_plateau',
'test_parallel_executor_dry_run',
'test_paddle_save_load',
'test_stack_op',
'test_overlap_add_op',
......@@ -3231,7 +3219,6 @@ TWO_PARALLEL_JOB = [
'test_bert',
'test_simple_rnn_op',
'trt_resnext_test',
'test_parallel_executor_fix_op_run_order',
'test_imperative_double_grad',
'test_cycle_gan',
'test_pretrained_model',
......
......@@ -370,12 +370,7 @@ STATIC_MODE_TESTING_LIST = [
'test_pad_op',
'test_pairwise_distance',
'test_parallel_executor_drop_scope',
'test_parallel_executor_dry_run',
'test_parallel_executor_feed_persistable_var',
'test_parallel_executor_inference_feed_partial_data',
'test_parallel_executor_mnist',
'test_parallel_executor_run_load_infer_program',
'test_parallel_executor_test_while_train',
'test_parallel_ssa_graph_inference_feed_partial_data',
'test_parameter',
'test_partial_concat_op',
......@@ -546,7 +541,6 @@ STATIC_MODE_TESTING_LIST = [
'test_parallel_executor_seresnext_with_reduce_cpu',
'test_parallel_executor_seresnext_with_fuse_all_reduce_cpu',
'test_layers',
'test_parallel_executor_fetch_feed',
'test_sequence_concat',
'test_sequence_conv',
'test_sequence_enumerate_op',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册