test_parallel_dygraph_dataparallel.py 6.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import time
import paddle.fluid as fluid
18 19 20
import copy
import os
import subprocess
21

22 23 24 25 26 27
from paddle.distributed.utils.launch_utils import (
    find_free_ports,
    watch_local_trainers,
    get_cluster,
    TrainerProc,
)
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49


def get_cluster_from_args(selected_gpus):
    cluster_node_ips = '127.0.0.1'
    node_ip = '127.0.0.1'

    node_ips = [x.strip() for x in cluster_node_ips.split(',')]

    node_ips.index(node_ip)

    free_ports = None

    free_ports = find_free_ports(len(selected_gpus))
    if free_ports is not None:
        free_ports = list(free_ports)

    trainer_endpoints = []
    for ip in node_ips:
        trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
    return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)


S
ShenLiang 已提交
50 51 52 53 54
def get_gpus(selected_gpus):
    selected_gpus = [x.strip() for x in selected_gpus.split(',')]
    return selected_gpus


55 56 57
def start_local_trainers_cpu(
    trainer_endpoints, training_script, training_script_args, log_dir=None
):
X
xiongkun 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70
    current_env = copy.copy(os.environ.copy())
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

    procs = []
    n_rank = len(trainer_endpoints)
    print(trainer_endpoints)
    for rank_id, endpoint in enumerate(trainer_endpoints):
        proc_env = {
            "PADDLE_DISTRI_BACKEND": "gloo",
            "PADDLE_TRAINER_ID": "%d" % rank_id,
            "PADDLE_CURRENT_ENDPOINT": "%s" % endpoint,
            "PADDLE_TRAINERS_NUM": "%d" % n_rank,
71
            "PADDLE_TRAINER_ENDPOINTS": ",".join(trainer_endpoints),
X
xiongkun 已提交
72 73 74 75 76 77
        }

        current_env.update(proc_env)

        print("trainer proc env:{}".format(current_env))

78 79 80
        assert (
            os.getenv('WITH_COVERAGE', 'OFF') == 'OFF'
        ), "Gloo don't support WITH_COVERAGE."
X
xiongkun 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
        cmd = "python -u " + training_script

        print("start trainer proc:{} env:{}".format(cmd, proc_env))

        fn = None

        proc = subprocess.Popen(cmd.split(" "), env=current_env)

        tp = TrainerProc()
        tp.proc = proc
        tp.rank = rank_id
        tp.log_fn = fn
        tp.cmd = cmd

        procs.append(tp)

    return procs


100 101 102 103 104 105 106 107
def start_local_trainers(
    cluster,
    pod,
    training_script,
    training_script_args,
    eager_mode=True,
    log_dir=None,
):
108
    current_env = copy.copy(os.environ.copy())
109 110 111 112
    # paddle broadcast ncclUniqueId use socket, and
    # proxy maybe make trainers unreachable, so delete them.
    # if we set them to "", grpc will log error message "bad uri"
    # so just delete them.
113 114 115 116 117 118 119 120 121 122
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

    procs = []
    for t in pod.trainers:
        proc_env = {
            "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
            "PADDLE_TRAINER_ID": "%d" % t.rank,
            "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
            "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
123
            "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
124 125
        }

126 127 128
        if not eager_mode:
            proc_env["FLAGS_enable_eager_mode"] = "%d" % 0

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
        current_env.update(proc_env)

        print("trainer proc env:{}".format(current_env))

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            cmd = "python -m coverage run --branch -p " + training_script
        else:
            cmd = "python -u " + training_script

        print("start trainer proc:{} env:{}".format(cmd, proc_env))

        fn = None

        proc = subprocess.Popen(cmd.split(" "), env=current_env)

        tp = TrainerProc()
        tp.proc = proc
        tp.rank = t.rank
        tp.log_fn = fn
        tp.cmd = cmd

        procs.append(tp)

    return procs


155
class TestMultipleGpus(unittest.TestCase):
156
    def run_mnist_2gpu(self, target_file_name, eager_mode=True):
157 158 159 160
        if (
            not fluid.core.is_compiled_with_cuda()
            or fluid.core.get_cuda_device_count() == 0
        ):
161 162 163 164 165 166 167 168
            return

        selected_gpus = get_gpus('0,1')
        cluster = None
        pod = None

        cluster, pod = get_cluster_from_args(selected_gpus)

169 170 171 172 173 174 175
        procs = start_local_trainers(
            cluster,
            pod,
            eager_mode=eager_mode,
            training_script=target_file_name,
            training_script_args=[],
        )
176

X
xiongkun 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189
        while True:
            alive = watch_local_trainers(procs, cluster.trainers_endpoints())

            if not alive:
                print("Local procs complete, POD info:{}".format(pod))
                break
            time.sleep(3)


class TestMultipleWithGloo(unittest.TestCase):
    def run_mnist_2cpu(self, target_file_name):

        cluster, pod = get_cluster_from_args(
190 191
            [0, 1]
        )  # tmp use. for getting trainer_nranks()
X
xiongkun 已提交
192

193 194 195 196 197
        procs = start_local_trainers_cpu(
            cluster.trainers_endpoints(),
            training_script=target_file_name,
            training_script_args=[],
        )
X
xiongkun 已提交
198

199 200 201 202 203 204 205 206
        while True:
            alive = watch_local_trainers(procs, cluster.trainers_nranks())

            if not alive:
                print("Local procs complete, POD info:{}".format(pod))
                break
            time.sleep(3)

J
JZ-LIANG 已提交
207 208

class TestDataParallelGradientCheck(TestMultipleGpus):
209
    def test_multiple_gpus_dynamic(self):
210 211 212
        self.run_mnist_2gpu(
            'parallel_dygraph_gradient_check.py', eager_mode=False
        )
213 214


215 216 217
class TestDataParallelWithPyLayer(TestMultipleGpus):
    def test_parallel_dygraph_dataparallel_with_pylayer(self):
        self.run_mnist_2gpu('parallel_dygraph_dataparallel_with_pylayer.py')
218 219 220
        self.run_mnist_2gpu(
            'parallel_dygraph_dataparallel_with_pylayer.py', eager_mode=False
        )
221 222


223 224 225 226 227
class TestGradientCheckInEagerMode(TestMultipleGpus):
    def test_multiple_gpus_dynamic(self):
        self.run_mnist_2gpu('parallel_dygraph_gradient_check_in_eager_mode.py')


228
if __name__ == "__main__":
229
    os.environ["FLAGS_enable_eager_mode"] = "1"
230
    unittest.main()