未验证 提交 d850022d 编写于 作者: K kangguangli 提交者: GitHub

rm parallel executor related unittests (#51700)

上级 ab3b87a6
......@@ -443,7 +443,6 @@ list(REMOVE_ITEM TEST_OPS test_parallel_executor_seresnext_with_reduce_cpu)
list(REMOVE_ITEM TEST_OPS
test_parallel_executor_seresnext_with_fuse_all_reduce_cpu)
list(REMOVE_ITEM TEST_OPS test_imperative_ocr_attention_model)
list(REMOVE_ITEM TEST_OPS test_async_ssa_graph_executor_mnist)
list(REMOVE_ITEM TEST_OPS test_install_check)
list(REMOVE_ITEM TEST_OPS test_fuse_all_reduce_pass)
list(REMOVE_ITEM TEST_OPS test_fuse_bn_act_pass)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import unittest
import numpy
import paddle
import paddle.fluid as fluid
BATCH_SIZE = 64
def convolutional_neural_network(use_py_reader):
with fluid.unique_name.guard():
img = paddle.static.data(
name='img', shape=[-1, 1, 28, 28], dtype='float32'
)
label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64')
py_reader = None
if use_py_reader:
py_reader = fluid.io.DataLoader.from_generator(
capacity=64,
feed_list=[img, label],
iterable=False,
use_double_buffer=False,
)
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=img,
filter_size=5,
num_filters=20,
pool_size=2,
pool_stride=2,
act="relu",
)
conv_pool_1 = paddle.static.nn.batch_norm(conv_pool_1)
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu",
)
prediction = paddle.static.nn.fc(
x=conv_pool_2, size=10, activation='softmax'
)
loss = paddle.nn.functional.cross_entropy(
input=prediction, label=label, reduction='none', use_softmax=False
)
avg_loss = paddle.mean(loss)
acc = paddle.static.accuracy(input=prediction, label=label)
i = fluid.layers.zeros(shape=[1], dtype='int64')
array = paddle.tensor.array_write(x=prediction, i=i)
paddle.increment(i)
paddle.tensor.array_write(x=acc, i=i, array=array)
return array, img, label, prediction, avg_loss, acc, py_reader
def test():
place = fluid.CPUPlace()
exe = fluid.Executor(place)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=BATCH_SIZE
)
(
array,
img,
label,
prediction,
avg_loss,
acc,
py_reader,
) = convolutional_neural_network(use_py_reader=False)
feeder = fluid.DataFeeder(feed_list=[img, label], place=place)
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
avg_loss_set = []
for test_data in train_test_reader():
acc_np, avg_loss_np = exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=[acc, avg_loss],
)
acc_set.append(float(acc_np))
avg_loss_set.append(float(avg_loss_np))
# get test acc and loss
acc_val_mean = numpy.array(acc_set).mean()
avg_loss_val_mean = numpy.array(avg_loss_set).mean()
return avg_loss_val_mean, acc_val_mean
# test for epoch
avg_loss_val, acc_val = train_test(
train_test_program=fluid.default_main_program(),
train_test_reader=test_reader,
train_test_feed=feeder,
)
print("Test: avg_cost: %s, acc: %s" % (avg_loss_val, acc_val))
assert acc_val > 0.96
def train(use_cuda, thread_num, cpu_num):
if use_cuda and not fluid.core.is_compiled_with_cuda():
print("paddle is not compiled with cuda, exit!")
return
(
array,
img,
label,
prediction,
avg_loss,
acc,
py_reader,
) = convolutional_neural_network(use_py_reader=True)
print("build convolutional neural network done.")
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(avg_loss)
print("Adam optimizer minimize done.")
train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE,
)
print("declared train reader done.")
place = fluid.CPUPlace()
exe = fluid.Executor(place)
print("going to run startup program")
exe.run(fluid.default_startup_program())
print("run startup program done.")
os.environ['CPU_NUM'] = str(cpu_num)
print("cpu_num:" + str(cpu_num))
print("thread_num:" + str(thread_num))
build_strategy = fluid.BuildStrategy()
build_strategy.async_mode = True
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = thread_num
exec_strategy.num_iteration_per_run = 10
main_program = fluid.default_main_program()
pe = fluid.ParallelExecutor(
use_cuda=False,
loss_name=avg_loss.name,
main_program=main_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
)
print("declare parallel executor done.")
py_reader.set_sample_list_generator(train_reader)
for pass_id in range(2):
step = 0
py_reader.start()
try:
while True:
array_v, acc_v, prediction_v, loss_val = pe.run(
fetch_list=[array, acc, prediction, avg_loss.name]
)
assert numpy.allclose(array_v[0], prediction_v)
assert numpy.allclose(array_v[1], acc_v)
loss_val = numpy.mean(loss_val)
if step % 10 == 0:
print(
"Pass %d, Batch %d, Cost %f, queue size %d"
% (pass_id, step, loss_val, py_reader.queue.size())
)
step += 1
except fluid.core.EOFException:
print("train end pass = " + str(pass_id))
py_reader.reset()
return step
class TestAsyncSSAGraphExecutor(unittest.TestCase):
def test_check_async_ssa_exe_train(self):
step_list = []
for cpu_num in [1, 2, 4]:
print("run cpu_num -> " + str(cpu_num))
with fluid.scope_guard(fluid.core.Scope()):
with fluid.program_guard(
main_program=fluid.Program(),
startup_program=fluid.Program(),
):
start_time = time.time()
step = train(
use_cuda=False, thread_num=cpu_num, cpu_num=cpu_num
)
end_time = time.time()
step_list.append(step)
print(
"cpu_num -> "
+ str(cpu_num)
+ " step -> "
+ str(step)
+ " time -> "
+ str(end_time - start_time)
)
with fluid.program_guard(
main_program=fluid.Program(),
startup_program=fluid.Program(),
):
test()
assert abs(int(step_list[0] / 2) - int(step_list[1])) < 5
assert abs(int(step_list[1] / 2) - int(step_list[2])) < 5
if __name__ == "__main__":
unittest.main()
......@@ -592,64 +592,6 @@ class EagerDeletionTwoRecurrentOpsTest(EagerDeletionRecurrentOpTest1):
return rnn_1()
class EagerDeletionRecurrentOpParallelExecutorTest(
EagerDeletionRecurrentOpTest1
):
'''
Test RNNOp with ParallelExecutor
equation:
h_t = ( x_t + h_{t-1} ) / scale
vars:
- x
memories:
- h
outputs:
- h
'''
def forward(self):
self.feed_map = {
x: create_tensor(getattr(self.py_rnn, x), self.place)
for x in self.data_field
}
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = True
exec_strategy = fluid.ExecutionStrategy()
parallel_exe = fluid.ParallelExecutor(
use_cuda=False,
main_program=self.main_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
)
out = parallel_exe.run(feed=self.feed_map, fetch_list=[self.output])
return out[0]
def backward(self):
self.feed_map = {
x: create_tensor(getattr(self.py_rnn, x), self.place)
for x in self.data_field
}
fetch_list = [
self.main_program.global_block().var(grad_var_name(x))
for x in self.data_field
]
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = True
exec_strategy = fluid.ExecutionStrategy()
parallel_exe = fluid.ParallelExecutor(
use_cuda=False,
loss_name=self.output.name,
main_program=self.main_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
)
return parallel_exe.run(
feed=self.feed_map, fetch_list=fetch_list, return_numpy=False
)
class EagerDeletionFarwardOnlyRnnAndBackwardRnnTest(
EagerDeletionRecurrentOpTest1
):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册