未验证 提交 a0b91c7b 编写于 作者: R Roc 提交者: GitHub

[Clean Fluid]Remove py_reader/double_buffer/create_py_reader_by_data/load from...

[Clean Fluid]Remove py_reader/double_buffer/create_py_reader_by_data/load from fluid.layer.io (#48589)


rm py_reader/double_buffer/create_py_reader_by_data/load

rm test_load_xpu
上级 33173ab4
此差异已折叠。
......@@ -51,7 +51,6 @@ from .dataloader.batch_sampler import _InfiniteIterableSampler
from .layers.io import (
monkey_patch_reader_methods,
_copy_reader_var_,
double_buffer,
)
from .unique_name import UniqueNameGenerator
from .framework import _get_paddle_place, _get_paddle_place_list
......@@ -1352,6 +1351,11 @@ class GeneratorLoader(DataLoaderBase):
self._use_double_buffer = use_double_buffer
self._capacity = capacity
if not self._iterable:
# Because layers.io.double_buffer is not supported anymore, and only when iterable and use_double_buffer
# are both True layers.io.double_buffer will be in use, here if itrable is False, use_double_buffer will be
# forcely set False to avoid using layers.io.double_buffer.
# TODO: keep use_double_buffer
self._use_double_buffer = False
self._init_non_iterable()
def _wait_thread_ends(self):
......@@ -1406,7 +1410,6 @@ class GeneratorLoader(DataLoaderBase):
'lod_tensor_blocking_queue'
)
reader_name = data_loader_unique_name_generator('create_py_reader')
double_buffer_name = data_loader_unique_name_generator('double_buffer')
var = global_scope().var(queue_name)
self._queue = core.init_lod_tensor_blocking_queue(
......@@ -1452,15 +1455,6 @@ class GeneratorLoader(DataLoaderBase):
reader = monkey_patch_reader_methods(main_prog_var)
if self._use_double_buffer:
double_buffer_reader = double_buffer(
reader, name=double_buffer_name
)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader
self._reader = reader
default_main_program().current_block().append_op(
......
......@@ -1079,7 +1079,6 @@ set_tests_properties(test_nan_inf PROPERTIES TIMEOUT 120)
set_tests_properties(test_deformable_conv_v1_op PROPERTIES TIMEOUT 300)
set_tests_properties(test_parallel_executor_transformer_auto_growth
PROPERTIES TIMEOUT 120)
set_tests_properties(test_py_reader_using_executor PROPERTIES TIMEOUT 120)
set_tests_properties(test_elementwise_add_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_weight_decay PROPERTIES TIMEOUT 120)
set_tests_properties(test_imperative_ptb_rnn_sorted_gradient PROPERTIES TIMEOUT
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
class TestLoadOp(unittest.TestCase):
"""Test load operator."""
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.ones = np.ones((4, 4)).astype('float32')
main_prog = fluid.Program()
start_prog = fluid.Program()
with fluid.program_guard(main_prog, start_prog):
input = fluid.data('input', shape=[-1, 4], dtype='float32')
output = layers.fc(
input,
4,
param_attr=fluid.ParamAttr(
name='w',
initializer=fluid.initializer.NumpyArrayInitializer(
self.ones
),
),
)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(start_prog)
paddle.distributed.io.save_persistables(
exe,
dirname=os.path.join(self.temp_dir.name, "./model"),
main_program=main_prog,
)
def tearDown(self):
self.temp_dir.cleanup()
def test_load(self):
main_prog = fluid.Program()
start_prog = fluid.Program()
with fluid.program_guard(main_prog, start_prog):
var = layers.create_tensor(dtype='float32')
layers.load(
var, file_path=os.path.join(self.temp_dir.name, './model/w')
)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(start_prog)
ret = exe.run(main_prog, fetch_list=[var.name])
np.testing.assert_array_equal(self.ones, ret[0])
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
@unittest.skipIf(
not paddle.is_compiled_with_xpu(), "core is not compiled with XPU"
)
class TestLoadOpXpu(unittest.TestCase):
"""Test load operator."""
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.model_path = os.path.join(self.temp_dir.name, "model")
self.ones = np.ones((4, 4)).astype('float32')
main_prog = fluid.Program()
start_prog = fluid.Program()
with fluid.program_guard(main_prog, start_prog):
input = fluid.data('input', shape=[-1, 4], dtype='float32')
output = layers.fc(
input,
4,
param_attr=fluid.ParamAttr(
name='w',
initializer=fluid.initializer.NumpyArrayInitializer(
self.ones
),
),
)
exe = fluid.Executor(fluid.XPUPlace(0))
exe.run(start_prog)
paddle.distributed.io.save_persistables(
exe, dirname=self.model_path, main_program=main_prog
)
def tearDown(self):
self.temp_dir.cleanup()
def test_load_xpu(self):
main_prog = fluid.Program()
start_prog = fluid.Program()
with fluid.program_guard(main_prog, start_prog):
var = layers.create_tensor(dtype='float32')
layers.load(var, file_path=self.model_path + '/w')
exe = fluid.Executor(fluid.XPUPlace(0))
exe.run(start_prog)
ret = exe.run(main_prog, fetch_list=[var.name])
np.testing.assert_array_equal(self.ones, ret[0])
if __name__ == "__main__":
unittest.main()
......@@ -105,41 +105,6 @@ class TestProgram(unittest.TestCase):
new_program = main_program.clone()
self.assertNotEqual(0, len(new_program.blocks[0].all_parameters()))
def test_program_inference_optimize(self):
def net():
reader = fluid.layers.py_reader(
capacity=10,
shapes=[[-1, 10], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
use_double_buffer=True,
)
in_data, label = fluid.layers.read_file(reader)
predict_label = fluid.layers.fc(in_data, size=2, act='softmax')
loss = paddle.mean(
fluid.layers.cross_entropy(input=predict_label, label=label)
)
optimizer = fluid.optimizer.Adam()
optimizer.minimize(loss)
startup_program = fluid.Program()
main_program = fluid.Program()
with fluid.program_guard(main_program, startup_program):
net()
no_read_program = main_program._inference_optimize()
keep_read_program = main_program._inference_optimize(
prune_read_op=False
)
no_read_ops = no_read_program.global_block().ops
keep_read_ops = keep_read_program.global_block().ops
self.assertEqual(len(keep_read_ops) - len(no_read_ops), 2)
self.assertEqual(keep_read_ops[0].type, 'create_double_buffer_reader')
self.assertEqual(keep_read_ops[1].type, 'read')
for i in range(len(no_read_ops)):
self.assertEqual(no_read_ops[i].type, keep_read_ops[i + 2].type)
def test_program_all_parameters(self):
program = fluid.default_main_program()
data = fluid.data(name='x', shape=[None, 13], dtype='float32')
......@@ -172,36 +137,6 @@ class TestProgram(unittest.TestCase):
TypeError, program._copy_dist_param_info_from, "program"
)
def test_remove_training_info(self):
def net():
reader = fluid.layers.py_reader(
capacity=10,
shapes=[[-1, 10], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
use_double_buffer=True,
)
in_data, label = fluid.layers.read_file(reader)
predict_label = fluid.layers.fc(in_data, size=2, act='softmax')
loss = paddle.mean(
fluid.layers.cross_entropy(input=predict_label, label=label)
)
optimizer = fluid.optimizer.Adam()
optimizer.minimize(loss)
main_program = fluid.Program()
with fluid.program_guard(main_program):
net()
removed_program = main_program._remove_training_info()
for i in range(removed_program.num_blocks):
block = removed_program.block(i)
for var in block.desc.all_vars():
self.assertFalse(var.has_is_parameter())
self.assertFalse(var.has_stop_gradient())
def build_program():
main_program = paddle.static.Program()
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
class TestPyReaderErrorMsg(unittest.TestCase):
def test_check_input_array(self):
fluid.reader.GeneratorLoader._check_input_array(
[
np.random.randint(100, size=[2]),
np.random.randint(100, size=[2]),
np.random.randint(100, size=[2]),
]
)
self.assertRaises(
TypeError,
fluid.reader.GeneratorLoader._check_input_array,
[
np.random.randint(100, size=[2]),
np.random.randint(100, size=[1]),
np.random.randint(100, size=[3]),
],
)
class TestDoubleBufferAPI(unittest.TestCase):
def test_double_buffer(self):
paddle.enable_static()
if fluid.core.is_compiled_with_cuda():
reader = fluid.layers.py_reader(
capacity=64,
shapes=[(-1, 1, 28, 28), (-1, 1)],
dtypes=['float32', 'int64'],
use_double_buffer=False,
)
reader = fluid.layers.double_buffer(
reader, place=fluid.core.CUDAPlace(0)
)
image, label = fluid.layers.read_file(reader)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
class TestLoDLevelShare(unittest.TestCase):
def setUp(self):
self.use_double_buffer = False
def test_lod_level_share(self):
reader = fluid.layers.py_reader(
capacity=16,
shapes=([-1, 256], [-1, 512], [-1, 100]),
dtypes=('float32', 'int64', 'double'),
lod_levels=(1, 2, 0),
use_double_buffer=self.use_double_buffer,
)
x, y, z = fluid.layers.read_file(reader)
self.assertEqual(x.lod_level, 1)
self.assertEqual(y.lod_level, 2)
self.assertEqual(z.lod_level, 0)
class TestLoDLevelShare2(TestLoDLevelShare):
def setUp(self):
self.use_double_buffer = True
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
def user_reader(inputs):
def _reader():
for d in inputs:
yield d
return _reader
def batch_feeder(batch_reader, pin_memory=False, img_dtype="float32"):
def _feeder():
for batch_data in batch_reader():
sample_batch = []
label_batch = []
for sample, label in batch_data:
sample_batch.append(sample)
label_batch.append([label])
tensor = core.LoDTensor()
label = core.LoDTensor()
place = core.CUDAPinnedPlace() if pin_memory else core.CPUPlace()
tensor.set(np.array(sample_batch, dtype=img_dtype), place)
label.set(np.array(label_batch, dtype="int64"), place)
yield [tensor, label]
return _feeder
class TestPyReader(unittest.TestCase):
def setUp(self):
self.capacity = 10
self.shapes = [(-1, 3, 2, 1), (-1, 1)]
self.lod_levels = [0, 0]
self.dtypes = ['float32', 'int64']
def test_pin_memory_pyreader(self):
with fluid.program_guard(fluid.Program(), fluid.Program()):
place = (
fluid.CUDAPlace(0)
if fluid.core.is_compiled_with_cuda()
else fluid.CPUPlace()
)
executor = fluid.Executor(place)
data_file = fluid.layers.py_reader(
capacity=self.capacity,
dtypes=self.dtypes,
lod_levels=self.lod_levels,
shapes=self.shapes,
)
# feed_queue = data_file.queue
read_out_data = fluid.layers.read_file(data_file)
self.inputs = []
for _ in range(10):
sample = np.random.uniform(
low=0, high=1, size=[3, 2, 1]
).astype("float32")
label = np.random.randint(low=0, high=10, dtype="int64")
self.inputs.append((sample, label))
self.input_tensors = []
for d, l in batch_feeder(
paddle.batch(user_reader(self.inputs), batch_size=2),
pin_memory=True
if fluid.core.is_compiled_with_cuda()
else False,
)():
ta = fluid.LoDTensorArray()
ta.append(d)
ta.append(l)
self.input_tensors.append(ta)
self.batched_inputs = []
for batch in paddle.batch(user_reader(self.inputs), batch_size=2)():
feed_d = []
feed_l = []
for d, l in batch:
feed_d.append(d)
feed_l.append([l])
self.batched_inputs.append([feed_d, feed_l])
data_file.decorate_tensor_provider(
batch_feeder(
paddle.batch(user_reader(self.inputs), batch_size=2),
pin_memory=True
if fluid.core.is_compiled_with_cuda()
else False,
)
)
executor.run(fluid.default_startup_program())
self.outputs = []
data_file.start()
for _ in self.input_tensors:
self.outputs.append(
executor.run(fetch_list=list(read_out_data))
)
data_file.reset()
self.validate()
def validate(self):
self.assertEqual(len(self.batched_inputs), len(self.outputs))
for in_data_list, out_data_list in zip(
self.batched_inputs, self.outputs
):
self.assertEqual(len(in_data_list), len(out_data_list))
in_data_list_np = [
np.array(in_lod_tensor) for in_lod_tensor in in_data_list
]
for in_data, out_data in zip(in_data_list_np, out_data_list):
self.assertTrue((in_data == out_data).all())
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from threading import Thread
import numpy as np
import paddle.fluid as fluid
def feed_data(feed_queue, inputs):
for in_data in inputs:
feed_queue.push(in_data)
class TestPyReader(unittest.TestCase):
def setUp(self):
self.capacity = 10
self.batch_size_min = 10
self.batch_size_max = 20
self.shapes = [(-1, 3, 2, 1), (-1, 1)]
self.lod_levels = [0, 0]
self.dtypes = ['float32', 'int64']
self.iterations = 20
def test_single_thread_main(self):
self.main(use_thread=False)
def test_multiple_thread_main(self):
self.main(use_thread=True)
def main(self, use_thread=False):
with fluid.program_guard(fluid.Program(), fluid.Program()):
place = (
fluid.CUDAPlace(0)
if fluid.core.is_compiled_with_cuda()
else fluid.CPUPlace()
)
executor = fluid.Executor(place)
data_file = fluid.layers.py_reader(
capacity=self.capacity,
dtypes=self.dtypes,
lod_levels=self.lod_levels,
shapes=self.shapes,
)
feed_queue = data_file.queue
read_out_data = fluid.layers.read_file(data_file)
self.inputs = []
for i in range(self.iterations):
in_data = fluid.LoDTensorArray()
batch_size = np.random.random_integers(
self.batch_size_min, self.batch_size_max
)
for shape, dtype in zip(self.shapes, self.dtypes):
next_data = np.random.uniform(
low=0, high=1000, size=(batch_size,) + shape[1:]
).astype(dtype)
in_data.append(
fluid.executor._as_lodtensor(next_data, place)
)
self.inputs.append(in_data)
executor.run(fluid.default_startup_program())
self.outputs = []
if use_thread:
thread = Thread(
target=feed_data, args=(feed_queue, self.inputs)
)
thread.start()
for in_data in self.inputs:
self.outputs.append(
executor.run(fetch_list=list(read_out_data))
)
else:
for in_data in self.inputs:
feed_queue.push(in_data)
self.outputs.append(
executor.run(fetch_list=list(read_out_data))
)
feed_queue.close()
self.validate()
def validate(self):
self.assertEqual(len(self.inputs), len(self.outputs))
for in_data_list, out_data_list in zip(self.inputs, self.outputs):
self.assertEqual(len(in_data_list), len(out_data_list))
in_data_list_np = [
np.array(in_lod_tensor) for in_lod_tensor in in_data_list
]
for in_data, out_data in zip(in_data_list_np, out_data_list):
self.assertTrue((in_data == out_data).all())
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
import threading
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.unique_name as unique_name
from paddle.fluid import compiler
os.environ['CPU_NUM'] = str(4)
def as_tensor(np_array_or_tensor, place=None):
if isinstance(np_array_or_tensor, fluid.LoDTensor):
return np_array_or_tensor
if place is None:
place = fluid.CPUPlace()
tensor = fluid.LoDTensor()
tensor.set(np_array_or_tensor, place)
return tensor
def as_numpy(tensor_or_numpy):
return (
tensor_or_numpy
if isinstance(tensor_or_numpy, np.ndarray)
else np.array(tensor_or_numpy)
)
def sample_list_to_tensor_array(sample_list):
slot_num = None
slots = None
for sample in sample_list:
if slot_num is None:
slot_num = len(sample)
slots = [None] * len(sample)
else:
assert slot_num == len(sample)
for slot_id, slot_item in enumerate(sample):
if slots[slot_id] is None:
slots[slot_id] = []
slots[slot_id].append(slot_item)
tensor_array = fluid.LoDTensorArray()
for slot in slots:
t = fluid.LoDTensor()
t.set(np.array(slot), fluid.CPUPlace())
tensor_array.append(t)
return tensor_array
def feed_data(feed_queue, batch_reader):
data_generator = batch_reader()
while True:
data = next(data_generator, None)
if data is None or (len(data) == 1 and data[0] is None):
break
if not feed_queue.push(sample_list_to_tensor_array(data)):
break
feed_queue.close()
def simple_fc_net(
in_size,
class_num,
hidden_sizes,
batch_size,
queue_capacity,
use_double_buffer=False,
use_feed_list=True,
):
in_data = fluid.layers.data(name="data", dtype='float32', shape=[in_size])
label = fluid.layers.data(name='label', dtype='int64', shape=[1])
if use_feed_list:
py_reader = fluid.layers.create_py_reader_by_data(
capacity=queue_capacity,
use_double_buffer=use_double_buffer,
feed_list=[in_data, label],
name=unique_name.generate('py_reader_name'),
)
else:
py_reader = fluid.layers.py_reader(
capacity=queue_capacity,
shapes=[in_data.shape, label.shape],
dtypes=['float32', 'int64'],
name=unique_name.generate('py_reader_name'),
use_double_buffer=use_double_buffer,
)
in_data, label = fluid.layers.read_file(py_reader)
feed_queue = py_reader.queue
hidden = in_data
for hidden_size in hidden_sizes:
hidden = fluid.layers.fc(
hidden,
size=hidden_size,
act='tanh',
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=1.0)
),
)
predict_label = fluid.layers.fc(hidden, size=class_num, act='softmax')
loss = paddle.mean(
fluid.layers.cross_entropy(input=predict_label, label=label)
)
optimizer = fluid.optimizer.Adam()
optimizer.minimize(loss)
return in_data, label, loss, optimizer, feed_queue, py_reader
class TestPyReaderUsingExecutor(unittest.TestCase):
def setUp(self):
self.in_size = 1000
self.hidden_sizes = [50, 30, 20]
self.class_num = 10
self.batch_size = 32
self.iterations = 10
self.queue_capacity = 50
def test(self):
for use_cuda in (
[False, True] if core.is_compiled_with_cuda() else [False]
):
for use_parallel_executor in [False, True]:
for use_double_buffer in [False, True]:
for use_feed_list in [False, True]:
for use_decorate_paddle_reader in [False, True]:
print('Test Parameters:'),
print(
{
'use_cuda': use_cuda,
'use_parallel_executor': use_parallel_executor,
'use_double_buffer': use_double_buffer,
'use_feed_list': use_feed_list,
'use_decorate_paddle_reader': use_decorate_paddle_reader,
}
)
self.main(
use_cuda,
use_parallel_executor,
use_double_buffer,
use_feed_list,
use_decorate_paddle_reader,
)
def tensor_reader(self, use_decorate_paddle_reader):
def reader():
for sample_id in range(
self.batch_size * self.iterations * self.batch_size_times
):
in_data = np.random.uniform(
low=0, high=1, size=(self.in_size,)
).astype('float32')
label = np.random.random_integers(
low=0, high=self.class_num - 1, size=(1,)
).astype('int64')
reshaped_in_data = np.reshape(in_data, [1, -1])
reshaped_label = np.reshape(label, [1, -1])
if sample_id % (self.batch_size * self.batch_size_times) == 0:
self.inputs.append([reshaped_in_data, reshaped_label])
else:
self.inputs[-1][0] = np.concatenate(
(self.inputs[-1][0], reshaped_in_data), axis=0
)
self.inputs[-1][1] = np.concatenate(
(self.inputs[-1][1], reshaped_label), axis=0
)
yield in_data, label
if not use_decorate_paddle_reader:
yield None
return reader
def main(
self,
use_cuda=True,
use_parallel_executor=False,
use_double_buffer=False,
use_feed_list=False,
use_decorate_paddle_reader=False,
):
assert not use_cuda or use_cuda and core.is_compiled_with_cuda()
self.use_cuda = use_cuda
self.use_parallel_executor = use_parallel_executor
self.use_double_buffer = use_double_buffer
self.use_feed_list = use_feed_list
self.use_decorate_paddle_reader = use_decorate_paddle_reader
startup_program = fluid.Program()
main_program = fluid.Program()
with fluid.program_guard(main_program, startup_program):
(
in_data,
label,
loss,
optimizer,
feed_queue,
py_reader,
) = simple_fc_net(
in_size=self.in_size,
class_num=self.class_num,
hidden_sizes=self.hidden_sizes,
batch_size=self.batch_size,
queue_capacity=self.queue_capacity,
use_double_buffer=self.use_double_buffer,
use_feed_list=self.use_feed_list,
)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
train_cp = main_program
if use_parallel_executor:
train_cp = compiler.CompiledProgram(
main_program
).with_data_parallel(loss_name=loss.name)
if use_cuda:
self.batch_size_times = core.get_cuda_device_count()
else:
self.batch_size_times = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count())
)
else:
self.batch_size_times = 1
reader = self.tensor_reader(use_decorate_paddle_reader)
batch_reader = paddle.batch(reader, batch_size=self.batch_size)
self.inputs = []
self.outputs = []
if use_decorate_paddle_reader:
if use_feed_list:
py_reader.decorate_paddle_reader(batch_reader)
else:
py_reader.decorate_sample_list_generator(batch_reader)
py_reader.start()
else:
thread = threading.Thread(
target=feed_data, args=(feed_queue, batch_reader)
)
thread.daemon = True
thread.start()
try:
while True:
fetches = exe.run(
train_cp, fetch_list=[in_data.name, label.name]
)
fetches = [as_numpy(fetch) for fetch in fetches]
self.outputs.append(fetches)
except fluid.core.EOFException:
pass
feed_queue.close()
self.validate()
if use_decorate_paddle_reader:
py_reader.exited = True
py_reader.thread.join()
else:
thread.join()
def validate(self):
if not self.use_double_buffer:
self.assertEqual(len(self.inputs), len(self.outputs))
else:
self.assertTrue(len(self.inputs) >= len(self.outputs))
for idx in range(len(self.outputs)):
batch_in = self.inputs[idx]
batch_out = self.outputs[idx]
self.assertEqual(len(batch_in), len(batch_out))
if self.use_parallel_executor and not self.use_double_buffer:
self.validate_unordered_batch(batch_in, batch_out)
else:
for in_data, out_data in zip(batch_in, batch_out):
self.assertEqual(in_data.shape, out_data.shape)
if not self.use_parallel_executor:
self.assertTrue((in_data == out_data).all())
def validate_unordered_batch(self, batch_in, batch_out):
out_index_left_set = set(range(self.batch_size * self.batch_size_times))
mapping_num = 0
for i in range(self.batch_size * self.batch_size_times):
for j in out_index_left_set:
flag = True
for k in range(len(batch_in)):
in_data = batch_in[k][i]
out_data = batch_out[k][j]
if (in_data != out_data).any():
flag = False
break
if flag:
out_index_left_set.remove(j)
mapping_num += 1
break
self.assertEqual(mapping_num, self.batch_size * self.batch_size_times)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册