parallel_dygraph_no_sync_control_flow.py 2.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import os
import contextlib
import unittest
import numpy as np
import six
import pickle
import random

import paddle
import paddle.fluid as fluid
import paddle.distributed as dist
import paddle.fluid.dygraph as dygraph
from paddle.fluid import core
from paddle.fluid.dygraph.nn import Linear
31 32
from test_dist_base import runtime_main
from parallel_dygraph_no_sync import TestNoSync
33 34 35 36 37 38 39 40

seed = 90
RUN_STEP = 20
batch_size = 4
batch_num = 1000


class SimpleNetControlFlow(fluid.Layer):
41

42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    def __init__(self):
        super(SimpleNetControlFlow, self).__init__()
        self.net_a = Linear(input_dim=10, output_dim=20)
        self.net_b = Linear(input_dim=20, output_dim=5)
        self.net_c = Linear(input_dim=5, output_dim=10)
        self.step = 0

    def forward(self, x):
        self.step = self.step + 1
        x = self.net_a(x)
        if self.step > 10:
            x.stop_gradient = True
        x = self.net_b(x)
        x = self.net_c(x)
        return x


59
class TestNoSyncControlFlow(TestNoSync):
60

61 62
    def get_model(self):
        model = SimpleNetControlFlow()
63 64 65
        train_reader = paddle.batch(fake_sample_reader(),
                                    batch_size=batch_size,
                                    drop_last=True)
66 67 68 69 70 71 72 73 74 75 76 77 78 79
        optimizer = paddle.optimizer.SGD(learning_rate=0.001,
                                         parameters=model.parameters())
        return model, train_reader, optimizer

    def run_one_loop(self, model, optimizer, batch):
        x_data = np.array([x for x in batch])
        x_data = x_data.reshape((-1, 10))
        x = paddle.to_tensor(x_data)
        out = model(x)
        loss = out.sum() / len(batch)
        return loss


def fake_sample_reader():
80

81 82 83 84 85 86 87 88 89 90
    def __reader__():
        for i in range(batch_num):
            x_data = np.random.random_sample((10, )).astype('float32')
            yield x_data

    return __reader__


if __name__ == "__main__":
    runtime_main(TestNoSyncControlFlow)