test_parallel_dygraph_dataparallel.py 6.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15 16 17
import copy
import os
import subprocess
18 19
import time
import unittest
20

21
import paddle.fluid as fluid
22
from paddle.distributed.utils.launch_utils import (
23
    TrainerProc,
24 25
    find_free_ports,
    get_cluster,
26
    watch_local_trainers,
27
)
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49


def get_cluster_from_args(selected_gpus):
    cluster_node_ips = '127.0.0.1'
    node_ip = '127.0.0.1'

    node_ips = [x.strip() for x in cluster_node_ips.split(',')]

    node_ips.index(node_ip)

    free_ports = None

    free_ports = find_free_ports(len(selected_gpus))
    if free_ports is not None:
        free_ports = list(free_ports)

    trainer_endpoints = []
    for ip in node_ips:
        trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
    return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)


S
ShenLiang 已提交
50 51 52 53 54
def get_gpus(selected_gpus):
    selected_gpus = [x.strip() for x in selected_gpus.split(',')]
    return selected_gpus


55 56 57
def start_local_trainers_cpu(
    trainer_endpoints, training_script, training_script_args, log_dir=None
):
X
xiongkun 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70
    current_env = copy.copy(os.environ.copy())
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

    procs = []
    n_rank = len(trainer_endpoints)
    print(trainer_endpoints)
    for rank_id, endpoint in enumerate(trainer_endpoints):
        proc_env = {
            "PADDLE_DISTRI_BACKEND": "gloo",
            "PADDLE_TRAINER_ID": "%d" % rank_id,
            "PADDLE_CURRENT_ENDPOINT": "%s" % endpoint,
            "PADDLE_TRAINERS_NUM": "%d" % n_rank,
71
            "PADDLE_TRAINER_ENDPOINTS": ",".join(trainer_endpoints),
X
xiongkun 已提交
72 73 74 75 76 77
        }

        current_env.update(proc_env)

        print("trainer proc env:{}".format(current_env))

78 79 80
        assert (
            os.getenv('WITH_COVERAGE', 'OFF') == 'OFF'
        ), "Gloo don't support WITH_COVERAGE."
X
xiongkun 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
        cmd = "python -u " + training_script

        print("start trainer proc:{} env:{}".format(cmd, proc_env))

        fn = None

        proc = subprocess.Popen(cmd.split(" "), env=current_env)

        tp = TrainerProc()
        tp.proc = proc
        tp.rank = rank_id
        tp.log_fn = fn
        tp.cmd = cmd

        procs.append(tp)

    return procs


100 101 102 103 104 105
def start_local_trainers(
    cluster,
    pod,
    training_script,
    training_script_args,
    eager_mode=True,
106
    allocator_strategy="auto_growth",
107 108
    log_dir=None,
):
109
    current_env = copy.copy(os.environ.copy())
110 111 112 113
    # paddle broadcast ncclUniqueId use socket, and
    # proxy maybe make trainers unreachable, so delete them.
    # if we set them to "", grpc will log error message "bad uri"
    # so just delete them.
114 115 116 117 118 119 120 121 122 123
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

    procs = []
    for t in pod.trainers:
        proc_env = {
            "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
            "PADDLE_TRAINER_ID": "%d" % t.rank,
            "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
            "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
124
            "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
125 126
        }

127 128 129 130
        proc_env["FLAGS_allocator_strategy"] = allocator_strategy
        if allocator_strategy == "auto_growth":
            proc_env["FLAGS_fraction_of_gpu_memory_to_use"] = "0.1"

131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
        current_env.update(proc_env)

        print("trainer proc env:{}".format(current_env))

        if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
            cmd = "python -m coverage run --branch -p " + training_script
        else:
            cmd = "python -u " + training_script

        print("start trainer proc:{} env:{}".format(cmd, proc_env))

        fn = None

        proc = subprocess.Popen(cmd.split(" "), env=current_env)

        tp = TrainerProc()
        tp.proc = proc
        tp.rank = t.rank
        tp.log_fn = fn
        tp.cmd = cmd

        procs.append(tp)

    return procs


157
class TestMultipleGpus(unittest.TestCase):
158 159 160 161 162 163
    def run_mnist_2gpu(
        self,
        target_file_name,
        eager_mode=True,
        allocator_strategy="auto_growth",
    ):
164 165 166 167
        if (
            not fluid.core.is_compiled_with_cuda()
            or fluid.core.get_cuda_device_count() == 0
        ):
168 169 170 171 172 173 174 175
            return

        selected_gpus = get_gpus('0,1')
        cluster = None
        pod = None

        cluster, pod = get_cluster_from_args(selected_gpus)

176 177 178 179
        procs = start_local_trainers(
            cluster,
            pod,
            eager_mode=eager_mode,
180
            allocator_strategy=allocator_strategy,
181 182 183
            training_script=target_file_name,
            training_script_args=[],
        )
184

X
xiongkun 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197
        while True:
            alive = watch_local_trainers(procs, cluster.trainers_endpoints())

            if not alive:
                print("Local procs complete, POD info:{}".format(pod))
                break
            time.sleep(3)


class TestMultipleWithGloo(unittest.TestCase):
    def run_mnist_2cpu(self, target_file_name):

        cluster, pod = get_cluster_from_args(
198 199
            [0, 1]
        )  # tmp use. for getting trainer_nranks()
X
xiongkun 已提交
200

201 202 203 204 205
        procs = start_local_trainers_cpu(
            cluster.trainers_endpoints(),
            training_script=target_file_name,
            training_script_args=[],
        )
X
xiongkun 已提交
206

207 208 209 210 211 212 213 214
        while True:
            alive = watch_local_trainers(procs, cluster.trainers_nranks())

            if not alive:
                print("Local procs complete, POD info:{}".format(pod))
                break
            time.sleep(3)

J
JZ-LIANG 已提交
215 216

class TestDataParallelGradientCheck(TestMultipleGpus):
217
    def test_multiple_gpus_dynamic(self):
218 219 220
        self.run_mnist_2gpu(
            'parallel_dygraph_gradient_check.py', eager_mode=False
        )
221 222


223 224 225
class TestDataParallelWithPyLayer(TestMultipleGpus):
    def test_parallel_dygraph_dataparallel_with_pylayer(self):
        self.run_mnist_2gpu('parallel_dygraph_dataparallel_with_pylayer.py')
226 227 228
        self.run_mnist_2gpu(
            'parallel_dygraph_dataparallel_with_pylayer.py', eager_mode=False
        )
229 230 231 232
        self.run_mnist_2gpu(
            'parallel_dygraph_dataparallel_with_pylayer.py',
            allocator_strategy="naive_best_fit",
        )
233 234


235 236 237 238 239
class TestGradientCheckInEagerMode(TestMultipleGpus):
    def test_multiple_gpus_dynamic(self):
        self.run_mnist_2gpu('parallel_dygraph_gradient_check_in_eager_mode.py')


240 241
if __name__ == "__main__":
    unittest.main()