test_multiprocess_dataloader_dynamic.py 6.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import time
import unittest

19
import numpy as np
20 21
from test_multiprocess_dataloader_static import (
    BATCH_SIZE,
22 23
    CLASS_NUM,
    EPOCH_NUM,
24 25
    IMAGE_SIZE,
    SAMPLE_NUM,
26 27 28
    RandomBatchedDataset,
    RandomDataset,
    prepare_places,
29
)
30

31
import paddle
32 33
import paddle.fluid as fluid
from paddle.io import DataLoader
34
from paddle.nn import Linear
35

36 37 38

class SimpleFCNet(fluid.dygraph.Layer):
    def __init__(self):
39
        super().__init__()
40

41 42
        param_attr = paddle.ParamAttr(
            initializer=paddle.nn.initializer.Constant(value=0.8)
43
        )
44 45
        bias_attr = paddle.ParamAttr(
            initializer=paddle.nn.initializer.Constant(value=0.5)
46
        )
47 48 49 50
        self._fcs = []
        in_channel = IMAGE_SIZE
        for hidden_size in [10, 20, 30]:
            self._fcs.append(
51 52 53
                Linear(
                    in_channel,
                    hidden_size,
54
                    weight_attr=param_attr,
55 56 57
                    bias_attr=bias_attr,
                )
            )
58
            self._fcs.append(paddle.nn.Tanh())
59 60
            in_channel = hidden_size
        self._fcs.append(
61 62 63
            Linear(
                in_channel,
                CLASS_NUM,
64
                weight_attr=param_attr,
65 66 67
                bias_attr=bias_attr,
            )
        )
68
        self._fcs.append(paddle.nn.Softmax())
69 70 71 72 73 74 75 76 77

    def forward(self, image):
        out = image
        for fc in self._fcs:
            out = fc(out)
        return out


class TestDygraphDataLoader(unittest.TestCase):
K
Kaipeng Deng 已提交
78
    def run_main(self, num_workers, places, persistent_workers):
79 80 81 82 83 84 85
        fluid.default_startup_program().random_seed = 1
        fluid.default_main_program().random_seed = 1
        with fluid.dygraph.guard(places[0]):
            fc_net = SimpleFCNet()
            optimizer = fluid.optimizer.Adam(parameter_list=fc_net.parameters())

            dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
86 87 88 89 90 91 92
            dataloader = DataLoader(
                dataset,
                num_workers=num_workers,
                batch_size=BATCH_SIZE,
                drop_last=True,
                persistent_workers=persistent_workers,
            )
93 94 95 96 97
            assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)

            step_list = []
            loss_list = []
            start_t = time.time()
98
            for _ in range(EPOCH_NUM):
99 100 101
                step = 0
                for image, label in dataloader():
                    out = fc_net(image)
102 103 104
                    loss = paddle.nn.functional.cross_entropy(
                        out, label, reduction='none', use_softmax=False
                    )
105
                    avg_loss = paddle.mean(loss)
106 107 108 109 110 111 112 113 114 115 116 117
                    avg_loss.backward()
                    optimizer.minimize(avg_loss)
                    fc_net.clear_gradients()

                    loss_list.append(np.mean(avg_loss.numpy()))
                    step += 1
                step_list.append(step)

        end_t = time.time()
        ret = {
            "time": end_t - start_t,
            "step": step_list,
118
            "loss": np.array(loss_list),
119 120 121 122 123 124 125
        }
        print("time cost", ret['time'], 'step_list', ret['step'])
        return ret

    def test_main(self):
        # dynamic graph do not run with_data_parallel
        for p in prepare_places(False):
K
Kaipeng Deng 已提交
126 127 128
            for persistent_workers in [False, True]:
                results = []
                for num_workers in [0, 2]:
129 130 131 132 133 134
                    print(
                        self.__class__.__name__,
                        p,
                        num_workers,
                        persistent_workers,
                    )
K
Kaipeng Deng 已提交
135
                    sys.stdout.flush()
136 137 138 139 140
                    ret = self.run_main(
                        num_workers=num_workers,
                        places=p,
                        persistent_workers=persistent_workers,
                    )
K
Kaipeng Deng 已提交
141 142
                    results.append(ret)
                diff = np.max(
143 144 145
                    np.abs(results[0]['loss'] - results[1]['loss'])
                    / np.abs(results[0]['loss'])
                )
K
Kaipeng Deng 已提交
146
                self.assertLess(diff, 1e-2)
147 148


149
class TestDygraphDataLoaderWithBatchedDataset(TestDygraphDataLoader):
K
Kaipeng Deng 已提交
150
    def run_main(self, num_workers, places, persistent_workers):
151 152 153 154 155 156 157
        fluid.default_startup_program().random_seed = 1
        fluid.default_main_program().random_seed = 1
        with fluid.dygraph.guard(places[0]):
            fc_net = SimpleFCNet()
            optimizer = fluid.optimizer.Adam(parameter_list=fc_net.parameters())

            dataset = RandomBatchedDataset(SAMPLE_NUM, CLASS_NUM)
158 159 160 161 162 163 164
            dataloader = DataLoader(
                dataset,
                num_workers=num_workers,
                batch_size=None,
                drop_last=True,
                persistent_workers=persistent_workers,
            )
165 166 167 168 169
            assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)

            step_list = []
            loss_list = []
            start_t = time.time()
170
            for _ in range(EPOCH_NUM):
171 172 173
                step = 0
                for image, label in dataloader():
                    out = fc_net(image)
174 175 176
                    loss = paddle.nn.functional.cross_entropy(
                        out, label, reduction='none', use_softmax=False
                    )
177
                    avg_loss = paddle.mean(loss)
178 179 180 181 182 183 184 185 186 187 188 189
                    avg_loss.backward()
                    optimizer.minimize(avg_loss)
                    fc_net.clear_gradients()

                    loss_list.append(np.mean(avg_loss.numpy()))
                    step += 1
                step_list.append(step)

        end_t = time.time()
        ret = {
            "time": end_t - start_t,
            "step": step_list,
190
            "loss": np.array(loss_list),
191 192 193 194 195
        }
        print("time cost", ret['time'], 'step_list', ret['step'])
        return ret


196 197
if __name__ == '__main__':
    unittest.main()