test_multiprocess_dataloader_static.py 11.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import six
import time
import unittest
import numpy as np

21
import paddle
22
import paddle.fluid as fluid
23
from paddle.io import DataLoader, Dataset
24

25 26 27 28
EPOCH_NUM = 3
BATCH_SIZE = 8
IMAGE_SIZE = 32
SAMPLE_NUM = 100
29 30 31 32
CLASS_NUM = 10


class RandomDataset(Dataset):
33

34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
    def __init__(self, sample_num, class_num):
        self.sample_num = sample_num
        self.class_num = class_num

    def __getitem__(self, idx):
        np.random.seed(idx)
        image = np.random.random([IMAGE_SIZE]).astype('float32')
        label = np.random.randint(0, self.class_num - 1, (1, )).astype('int64')
        return image, label

    def __len__(self):
        return self.sample_num


def simple_fc_net_static():
    startup_prog = fluid.Program()
    main_prog = fluid.Program()
    startup_prog.random_seed = 1
    main_prog.random_seed = 1

    with fluid.unique_name.guard():
        with fluid.program_guard(main_prog, startup_prog):
56 57 58
            image = fluid.data(name='image',
                               shape=[None, IMAGE_SIZE],
                               dtype='float32')
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
            label = fluid.data(name='label', shape=[None, 1], dtype='int64')
            hidden = image
            param_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant(
                value=0.8))
            bias_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant(
                value=0.5))
            for hidden_size in [10, 20, 30]:
                hidden = fluid.layers.fc(hidden,
                                         size=hidden_size,
                                         act='tanh',
                                         param_attr=param_attr,
                                         bias_attr=bias_attr)

            predict_label = fluid.layers.fc(hidden,
                                            size=CLASS_NUM,
                                            act='softmax',
                                            param_attr=param_attr,
                                            bias_attr=bias_attr)
            loss = fluid.layers.reduce_mean(
78
                fluid.layers.cross_entropy(input=predict_label, label=label))
79 80 81 82 83 84

            optimizer = fluid.optimizer.Adam()
            optimizer.minimize(loss)
    return startup_prog, main_prog, image, label, loss


85 86 87 88 89 90
def prepare_places(with_data_parallel, with_cpu=False, with_gpu=True):
    places = []
    if with_cpu:
        places.append([fluid.CPUPlace()])
        if with_data_parallel:
            places.append([fluid.CPUPlace()] * 2)
91

92 93 94
    if with_gpu and fluid.core.is_compiled_with_cuda():
        tmp = fluid.cuda_places()[:2]
        assert len(tmp) > 0, "no gpu detected"
K
Kaipeng Deng 已提交
95
        if with_data_parallel and len(tmp) > 1:
96 97 98
            places.append(tmp)
        places.append([tmp[0]])
    return places
99 100 101


class TestStaticDataLoader(unittest.TestCase):
102

K
Kaipeng Deng 已提交
103
    def run_main(self, num_workers, places, persistent_workers, use_pe=True):
104 105 106 107 108
        scope = fluid.Scope()
        with fluid.scope_guard(scope):
            startup_prog, main_prog, image, label, loss = simple_fc_net_static()

            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
109 110 111 112 113 114 115 116
            dataloader = DataLoader(dataset,
                                    feed_list=[image, label],
                                    places=places,
                                    num_workers=num_workers,
                                    batch_size=BATCH_SIZE,
                                    return_list=False,
                                    drop_last=True,
                                    persistent_workers=persistent_workers)
117 118 119 120 121
            assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)

            exe = fluid.Executor(place=places[0])
            exe.run(startup_prog)

122 123 124
            if use_pe:
                prog = fluid.CompiledProgram(main_prog)
                if len(places) > 1:
125 126
                    prog = prog.with_data_parallel(loss_name=loss.name,
                                                   places=places)
127 128
            else:
                prog = main_prog
129 130 131 132 133 134 135 136 137 138 139 140 141 142

            step_list = []
            loss_list = []
            start_t = time.time()
            for _ in six.moves.range(EPOCH_NUM):
                step = 0
                for d in dataloader:
                    assert len(d) == len(places), "{} != {}".format(
                        len(d), len(places))
                    for i, item in enumerate(d):
                        image = item['image']
                        label = item['label']
                        assert image.shape() == [BATCH_SIZE, IMAGE_SIZE]
                        assert label.shape() == [BATCH_SIZE, 1]
143 144
                        assert image._place()._equals(places[i])
                        assert label._place()._equals(places[i])
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
                    L, = exe.run(program=prog,
                                 feed=d,
                                 fetch_list=[loss],
                                 use_program_cache=True)
                    loss_list.append(np.mean(L))
                    step += 1
                step_list.append(step)

        end_t = time.time()
        ret = {
            "time": end_t - start_t,
            "step": step_list,
            "loss": np.array(loss_list)
        }
        print("time cost", ret['time'], 'step_list', ret['step'])
        return ret

    def test_main(self):
163
        for p in prepare_places(True):
K
Kaipeng Deng 已提交
164 165 166 167 168 169
            for persistent_workers in [True, False]:
                results = []
                for num_workers in [0, 2]:
                    print(self.__class__.__name__, p, num_workers,
                          persistent_workers)
                    sys.stdout.flush()
170 171 172
                    ret = self.run_main(num_workers=num_workers,
                                        places=p,
                                        persistent_workers=persistent_workers)
K
Kaipeng Deng 已提交
173 174 175 176 177
                    results.append(ret)
                diff = np.max(
                    np.abs(results[0]['loss'] - results[1]['loss']) /
                    np.abs(results[0]['loss']))
                self.assertLess(diff, 1e-2)
178 179


180
class TestStaticDataLoaderReturnList(unittest.TestCase):
181

182
    def run_single_place(self, num_workers):
183
        scope = fluid.Scope()
184 185 186
        image = fluid.data(name='image',
                           shape=[None, IMAGE_SIZE],
                           dtype='float32')
187 188 189
        label = fluid.data(name='label', shape=[None, 1], dtype='int64')
        with fluid.scope_guard(scope):
            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
190 191
            dataloader = DataLoader(dataset,
                                    feed_list=[image, label],
192
                                    num_workers=num_workers,
193 194 195
                                    batch_size=BATCH_SIZE,
                                    drop_last=True,
                                    return_list=True)
196 197 198 199 200 201 202

            for d in dataloader:
                assert isinstance(d, list)
                assert len(d) == 2
                assert not isinstance(d[0], list)
                assert not isinstance(d[1], list)

203
    def run_multi_place(self, num_workers):
204
        scope = fluid.Scope()
205 206 207
        image = fluid.data(name='image',
                           shape=[None, IMAGE_SIZE],
                           dtype='float32')
208 209 210
        label = fluid.data(name='label', shape=[None, 1], dtype='int64')
        with fluid.scope_guard(scope):
            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
211 212
            dataloader = DataLoader(dataset,
                                    feed_list=[image, label],
213
                                    num_workers=num_workers,
214 215 216 217
                                    batch_size=BATCH_SIZE,
                                    places=[fluid.CPUPlace()] * 2,
                                    drop_last=True,
                                    return_list=True)
218 219 220 221 222 223 224

            for d in dataloader:
                assert isinstance(d, list)
                assert len(d) == 2
                assert isinstance(d[0], list)
                assert isinstance(d[1], list)

225 226 227 228 229 230
    def test_main(self):
        paddle.enable_static()
        for num_workers in [0, 2]:
            self.run_single_place(num_workers)
            self.run_multi_place(num_workers)

231

232
class RandomBatchedDataset(Dataset):
233

234 235 236 237 238 239 240 241 242 243
    def __init__(self, sample_num, class_num):
        self.sample_num = int(sample_num / BATCH_SIZE)
        self.class_num = class_num

    def __getitem__(self, idx):
        np.random.seed(idx)
        images = []
        labels = []
        for _ in range(BATCH_SIZE):
            image = np.random.random([IMAGE_SIZE]).astype('float32')
244 245
            label = np.random.randint(0, self.class_num - 1,
                                      (1, )).astype('int64')
246 247 248 249 250 251 252 253 254
            images.append(image)
            labels.append(label)
        return np.stack(images, axis=0), np.stack(labels, axis=0)

    def __len__(self):
        return self.sample_num


class TestStaticDataLoaderWithBatchedDataset(TestStaticDataLoader):
255

K
Kaipeng Deng 已提交
256
    def run_main(self, num_workers, places, persistent_workers):
257 258 259 260 261
        scope = fluid.Scope()
        with fluid.scope_guard(scope):
            startup_prog, main_prog, image, label, loss = simple_fc_net_static()

            dataset = RandomBatchedDataset(SAMPLE_NUM, CLASS_NUM)
262 263 264 265 266 267 268 269
            dataloader = DataLoader(dataset,
                                    feed_list=[image, label],
                                    places=places,
                                    num_workers=num_workers,
                                    batch_size=None,
                                    return_list=False,
                                    drop_last=True,
                                    persistent_workers=persistent_workers)
270 271 272 273 274 275 276
            assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)

            exe = fluid.Executor(place=places[0])
            exe.run(startup_prog)

            prog = fluid.CompiledProgram(main_prog)
            if len(places) > 1:
277 278
                prog = prog.with_data_parallel(loss_name=loss.name,
                                               places=places)
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312

            step_list = []
            loss_list = []
            start_t = time.time()
            for _ in six.moves.range(EPOCH_NUM):
                step = 0
                for d in dataloader:
                    assert len(d) == len(places), "{} != {}".format(
                        len(d), len(places))
                    for i, item in enumerate(d):
                        image = item['image']
                        label = item['label']
                        assert image.shape() == [BATCH_SIZE, IMAGE_SIZE]
                        assert label.shape() == [BATCH_SIZE, 1]
                        assert image._place()._equals(places[i])
                        assert label._place()._equals(places[i])
                    L, = exe.run(program=prog,
                                 feed=d,
                                 fetch_list=[loss],
                                 use_program_cache=True)
                    loss_list.append(np.mean(L))
                    step += 1
                step_list.append(step)

        end_t = time.time()
        ret = {
            "time": end_t - start_t,
            "step": step_list,
            "loss": np.array(loss_list)
        }
        print("time cost", ret['time'], 'step_list', ret['step'])
        return ret


313 314
if __name__ == '__main__':
    unittest.main()