test_multiprocess_dataloader_static.py 11.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import time
import unittest
import numpy as np

20
import paddle
21
import paddle.fluid as fluid
22
from paddle.io import DataLoader, Dataset
23

24 25 26 27
EPOCH_NUM = 3
BATCH_SIZE = 8
IMAGE_SIZE = 32
SAMPLE_NUM = 100
28 29 30 31 32 33 34 35 36 37 38
CLASS_NUM = 10


class RandomDataset(Dataset):
    def __init__(self, sample_num, class_num):
        self.sample_num = sample_num
        self.class_num = class_num

    def __getitem__(self, idx):
        np.random.seed(idx)
        image = np.random.random([IMAGE_SIZE]).astype('float32')
39
        label = np.random.randint(0, self.class_num - 1, (1,)).astype('int64')
40 41 42 43 44 45 46 47 48 49 50 51 52 53
        return image, label

    def __len__(self):
        return self.sample_num


def simple_fc_net_static():
    startup_prog = fluid.Program()
    main_prog = fluid.Program()
    startup_prog.random_seed = 1
    main_prog.random_seed = 1

    with fluid.unique_name.guard():
        with fluid.program_guard(main_prog, startup_prog):
54 55 56
            image = fluid.data(
                name='image', shape=[None, IMAGE_SIZE], dtype='float32'
            )
57 58
            label = fluid.data(name='label', shape=[None, 1], dtype='int64')
            hidden = image
59 60 61 62 63 64
            param_attr = fluid.ParamAttr(
                initializer=fluid.initializer.Constant(value=0.8)
            )
            bias_attr = fluid.ParamAttr(
                initializer=fluid.initializer.Constant(value=0.5)
            )
65
            for hidden_size in [10, 20, 30]:
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
                hidden = fluid.layers.fc(
                    hidden,
                    size=hidden_size,
                    act='tanh',
                    param_attr=param_attr,
                    bias_attr=bias_attr,
                )

            predict_label = fluid.layers.fc(
                hidden,
                size=CLASS_NUM,
                act='softmax',
                param_attr=param_attr,
                bias_attr=bias_attr,
            )
81
            loss = fluid.layers.reduce_mean(
82 83
                fluid.layers.cross_entropy(input=predict_label, label=label)
            )
84 85 86 87 88 89

            optimizer = fluid.optimizer.Adam()
            optimizer.minimize(loss)
    return startup_prog, main_prog, image, label, loss


90 91 92 93 94 95
def prepare_places(with_data_parallel, with_cpu=False, with_gpu=True):
    places = []
    if with_cpu:
        places.append([fluid.CPUPlace()])
        if with_data_parallel:
            places.append([fluid.CPUPlace()] * 2)
96

97 98 99
    if with_gpu and fluid.core.is_compiled_with_cuda():
        tmp = fluid.cuda_places()[:2]
        assert len(tmp) > 0, "no gpu detected"
K
Kaipeng Deng 已提交
100
        if with_data_parallel and len(tmp) > 1:
101 102 103
            places.append(tmp)
        places.append([tmp[0]])
    return places
104 105 106


class TestStaticDataLoader(unittest.TestCase):
K
Kaipeng Deng 已提交
107
    def run_main(self, num_workers, places, persistent_workers, use_pe=True):
108 109 110 111 112
        scope = fluid.Scope()
        with fluid.scope_guard(scope):
            startup_prog, main_prog, image, label, loss = simple_fc_net_static()

            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
113 114 115 116 117 118 119 120 121 122
            dataloader = DataLoader(
                dataset,
                feed_list=[image, label],
                places=places,
                num_workers=num_workers,
                batch_size=BATCH_SIZE,
                return_list=False,
                drop_last=True,
                persistent_workers=persistent_workers,
            )
123 124 125 126 127
            assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)

            exe = fluid.Executor(place=places[0])
            exe.run(startup_prog)

128 129 130
            if use_pe:
                prog = fluid.CompiledProgram(main_prog)
                if len(places) > 1:
131 132 133
                    prog = prog.with_data_parallel(
                        loss_name=loss.name, places=places
                    )
134 135
            else:
                prog = main_prog
136 137 138 139

            step_list = []
            loss_list = []
            start_t = time.time()
140
            for _ in range(EPOCH_NUM):
141 142 143
                step = 0
                for d in dataloader:
                    assert len(d) == len(places), "{} != {}".format(
144 145
                        len(d), len(places)
                    )
146 147 148 149 150
                    for i, item in enumerate(d):
                        image = item['image']
                        label = item['label']
                        assert image.shape() == [BATCH_SIZE, IMAGE_SIZE]
                        assert label.shape() == [BATCH_SIZE, 1]
151 152
                        assert image._place()._equals(places[i])
                        assert label._place()._equals(places[i])
153 154 155 156 157 158
                    (L,) = exe.run(
                        program=prog,
                        feed=d,
                        fetch_list=[loss],
                        use_program_cache=True,
                    )
159 160 161 162 163 164 165 166
                    loss_list.append(np.mean(L))
                    step += 1
                step_list.append(step)

        end_t = time.time()
        ret = {
            "time": end_t - start_t,
            "step": step_list,
167
            "loss": np.array(loss_list),
168 169 170 171 172
        }
        print("time cost", ret['time'], 'step_list', ret['step'])
        return ret

    def test_main(self):
173
        for p in prepare_places(True):
K
Kaipeng Deng 已提交
174 175 176
            for persistent_workers in [True, False]:
                results = []
                for num_workers in [0, 2]:
177 178 179 180 181 182
                    print(
                        self.__class__.__name__,
                        p,
                        num_workers,
                        persistent_workers,
                    )
K
Kaipeng Deng 已提交
183
                    sys.stdout.flush()
184 185 186 187 188
                    ret = self.run_main(
                        num_workers=num_workers,
                        places=p,
                        persistent_workers=persistent_workers,
                    )
K
Kaipeng Deng 已提交
189 190
                    results.append(ret)
                diff = np.max(
191 192 193
                    np.abs(results[0]['loss'] - results[1]['loss'])
                    / np.abs(results[0]['loss'])
                )
K
Kaipeng Deng 已提交
194
                self.assertLess(diff, 1e-2)
195 196


197
class TestStaticDataLoaderReturnList(unittest.TestCase):
198
    def run_single_place(self, num_workers):
199
        scope = fluid.Scope()
200 201 202
        image = fluid.data(
            name='image', shape=[None, IMAGE_SIZE], dtype='float32'
        )
203 204 205
        label = fluid.data(name='label', shape=[None, 1], dtype='int64')
        with fluid.scope_guard(scope):
            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
206 207 208 209 210 211 212 213
            dataloader = DataLoader(
                dataset,
                feed_list=[image, label],
                num_workers=num_workers,
                batch_size=BATCH_SIZE,
                drop_last=True,
                return_list=True,
            )
214 215 216 217 218 219 220

            for d in dataloader:
                assert isinstance(d, list)
                assert len(d) == 2
                assert not isinstance(d[0], list)
                assert not isinstance(d[1], list)

221
    def run_multi_place(self, num_workers):
222
        scope = fluid.Scope()
223 224 225
        image = fluid.data(
            name='image', shape=[None, IMAGE_SIZE], dtype='float32'
        )
226 227 228
        label = fluid.data(name='label', shape=[None, 1], dtype='int64')
        with fluid.scope_guard(scope):
            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
229 230 231 232 233 234 235 236 237
            dataloader = DataLoader(
                dataset,
                feed_list=[image, label],
                num_workers=num_workers,
                batch_size=BATCH_SIZE,
                places=[fluid.CPUPlace()] * 2,
                drop_last=True,
                return_list=True,
            )
238 239 240 241 242 243 244

            for d in dataloader:
                assert isinstance(d, list)
                assert len(d) == 2
                assert isinstance(d[0], list)
                assert isinstance(d[1], list)

245 246 247 248 249 250
    def test_main(self):
        paddle.enable_static()
        for num_workers in [0, 2]:
            self.run_single_place(num_workers)
            self.run_multi_place(num_workers)

251

252 253 254 255 256 257 258 259 260 261 262
class RandomBatchedDataset(Dataset):
    def __init__(self, sample_num, class_num):
        self.sample_num = int(sample_num / BATCH_SIZE)
        self.class_num = class_num

    def __getitem__(self, idx):
        np.random.seed(idx)
        images = []
        labels = []
        for _ in range(BATCH_SIZE):
            image = np.random.random([IMAGE_SIZE]).astype('float32')
263 264 265
            label = np.random.randint(0, self.class_num - 1, (1,)).astype(
                'int64'
            )
266 267 268 269 270 271 272 273 274
            images.append(image)
            labels.append(label)
        return np.stack(images, axis=0), np.stack(labels, axis=0)

    def __len__(self):
        return self.sample_num


class TestStaticDataLoaderWithBatchedDataset(TestStaticDataLoader):
K
Kaipeng Deng 已提交
275
    def run_main(self, num_workers, places, persistent_workers):
276 277 278 279 280
        scope = fluid.Scope()
        with fluid.scope_guard(scope):
            startup_prog, main_prog, image, label, loss = simple_fc_net_static()

            dataset = RandomBatchedDataset(SAMPLE_NUM, CLASS_NUM)
281 282 283 284 285 286 287 288 289 290
            dataloader = DataLoader(
                dataset,
                feed_list=[image, label],
                places=places,
                num_workers=num_workers,
                batch_size=None,
                return_list=False,
                drop_last=True,
                persistent_workers=persistent_workers,
            )
291 292 293 294 295 296 297
            assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)

            exe = fluid.Executor(place=places[0])
            exe.run(startup_prog)

            prog = fluid.CompiledProgram(main_prog)
            if len(places) > 1:
298 299 300
                prog = prog.with_data_parallel(
                    loss_name=loss.name, places=places
                )
301 302 303 304

            step_list = []
            loss_list = []
            start_t = time.time()
305
            for _ in range(EPOCH_NUM):
306 307 308
                step = 0
                for d in dataloader:
                    assert len(d) == len(places), "{} != {}".format(
309 310
                        len(d), len(places)
                    )
311 312 313 314 315 316 317
                    for i, item in enumerate(d):
                        image = item['image']
                        label = item['label']
                        assert image.shape() == [BATCH_SIZE, IMAGE_SIZE]
                        assert label.shape() == [BATCH_SIZE, 1]
                        assert image._place()._equals(places[i])
                        assert label._place()._equals(places[i])
318 319 320 321 322 323
                    (L,) = exe.run(
                        program=prog,
                        feed=d,
                        fetch_list=[loss],
                        use_program_cache=True,
                    )
324 325 326 327 328 329 330 331
                    loss_list.append(np.mean(L))
                    step += 1
                step_list.append(step)

        end_t = time.time()
        ret = {
            "time": end_t - start_t,
            "step": step_list,
332
            "loss": np.array(loss_list),
333 334 335 336 337
        }
        print("time cost", ret['time'], 'step_list', ret['step'])
        return ret


338 339
if __name__ == '__main__':
    unittest.main()