未验证 提交 f064ead6 编写于 作者: R Roc 提交者: GitHub

[CI] CI for Distributed (#44085)

上级 16c7c96e
...@@ -243,6 +243,7 @@ include(simd) ...@@ -243,6 +243,7 @@ include(simd)
option(WITH_AVX "Compile PaddlePaddle with AVX intrinsics" ${AVX_FOUND}) option(WITH_AVX "Compile PaddlePaddle with AVX intrinsics" ${AVX_FOUND})
option(WITH_PYTHON "Compile PaddlePaddle with python interpreter" ON) option(WITH_PYTHON "Compile PaddlePaddle with python interpreter" ON)
option(WITH_TESTING "Compile PaddlePaddle with unit testing" OFF) option(WITH_TESTING "Compile PaddlePaddle with unit testing" OFF)
option(WITH_MULTINODE_TESTING "Test multinode apis and ops" OFF)
option(WITH_MKL "Compile PaddlePaddle with MKL support." ${AVX_FOUND}) option(WITH_MKL "Compile PaddlePaddle with MKL support." ${AVX_FOUND})
option(WITH_SYSTEM_BLAS "Use system blas library" OFF) option(WITH_SYSTEM_BLAS "Use system blas library" OFF)
option(WITH_DISTRIBUTE "Compile with distributed support" OFF) option(WITH_DISTRIBUTE "Compile with distributed support" OFF)
......
...@@ -7,6 +7,12 @@ set(GC_ENVS FLAGS_eager_delete_tensor_gb=0.0 FLAGS_fast_eager_deletion_mode=1 ...@@ -7,6 +7,12 @@ set(GC_ENVS FLAGS_eager_delete_tensor_gb=0.0 FLAGS_fast_eager_deletion_mode=1
FLAGS_memory_fraction_of_eager_deletion=1.0) FLAGS_memory_fraction_of_eager_deletion=1.0)
set(dist_ENVS http_proxy="" https_proxy="") set(dist_ENVS http_proxy="" https_proxy="")
file(
GLOB MULTINODE_DIST_TEST_OPS
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
"test_multinode_*.py")
string(REPLACE ".py" "" MULTINODE_DIST_TEST_OPS "${MULTINODE_DIST_TEST_OPS}")
file( file(
GLOB DIST_TEST_OPS GLOB DIST_TEST_OPS
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
...@@ -78,6 +84,11 @@ list(APPEND DIST_TEST_OPS test_collective_batch_isend_irecv) ...@@ -78,6 +84,11 @@ list(APPEND DIST_TEST_OPS test_collective_batch_isend_irecv)
list(APPEND DIST_TEST_OPS test_collective_reduce_scatter) list(APPEND DIST_TEST_OPS test_collective_reduce_scatter)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
#remove distribute unittests. #remove distribute unittests.
foreach(TEST_OP ${MULTINODE_DIST_TEST_OPS})
list(APPEND MIXED_DIST_TEST_OPS ${TEST_OP})
endforeach()
list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) list(APPEND MIXED_DIST_TEST_OPS test_dgc_op)
list(APPEND MIXED_DIST_TEST_OPS test_dgc_momentum_op) list(APPEND MIXED_DIST_TEST_OPS test_dgc_momentum_op)
list(APPEND MIXED_DIST_TEST_OPS test_dgc_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_dgc_optimizer)
...@@ -135,6 +146,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_reshard_mppp) ...@@ -135,6 +146,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_reshard_mppp)
list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_reshard_dpmppp) list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_reshard_dpmppp)
list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_cost_model) list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_cost_model)
list(APPEND MIXED_DIST_TEST_OPS test_tcp_store) list(APPEND MIXED_DIST_TEST_OPS test_tcp_store)
list(APPEND MIXED_DIST_TEST_OPS test_dygraph_hybrid_dp)
foreach(TEST_OP ${MIXED_DIST_TEST_OPS}) foreach(TEST_OP ${MIXED_DIST_TEST_OPS})
list(REMOVE_ITEM TEST_OPS ${TEST_OP}) list(REMOVE_ITEM TEST_OPS ${TEST_OP})
endforeach() endforeach()
...@@ -958,6 +970,21 @@ if(WITH_DISTRIBUTE) ...@@ -958,6 +970,21 @@ if(WITH_DISTRIBUTE)
PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
endif() endif()
# add new dist test
if(WITH_DISTRIBUTE AND WITH_MULTINODE_TESTING)
foreach(TEST_OP ${MULTINODE_DIST_TEST_OPS})
bash_test_modules(
${TEST_OP}
START_BASH
multinode_dist_test.sh
LABELS
"RUN_TYPE=EXCLUSIVE"
ENVS
"PADDLE_DIST_UT_PORT=${dist_ut_port}")
endforeach()
endif()
# port range (20000, 23000) is reserved for dist-ops # port range (20000, 23000) is reserved for dist-ops
foreach(TEST_OP ${DIST_TEST_OPS}) foreach(TEST_OP ${DIST_TEST_OPS})
bash_test_modules( bash_test_modules(
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.distributed import fleet
def init_parallel_env(mode, global_batch_size, seed=1024):
'''
Args:
mode:(str) DP1-MP1-PP1-SH1-O1
'''
def parse_mode(mode):
assert "DP" == mode[:2]
assert "-MP" in mode
assert "-PP" in mode
assert "-SH" in mode
assert "-O" in mode
modes = mode.split("-")
DP = int(modes[0][2:])
MP = int(modes[1][2:])
PP = int(modes[2][2:])
SH = int(modes[3][2:])
Ostage = int(modes[4][1:])
return DP, MP, PP, SH, Ostage
DP, MP, PP, SH, Ostage = parse_mode(mode)
strategy = fleet.DistributedStrategy()
strategy.hybrid_configs = {
"dp_degree": DP,
"mp_degree": MP,
"pp_degree": PP,
"sharding_degree": SH
}
accumulate_steps = 1
if PP > 1:
strategy.pipeline_configs = {
"accumulate_steps": accumulate_steps,
"micro_batch_size": global_batch_size // DP // accumulate_steps
}
# set control in tensor parallel
strategy.tensor_parallel_configs = {"tensor_init_seed": seed}
fleet.init(is_collective=True, strategy=strategy)
return fleet.get_hybrid_communicate_group()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
from six import string_types
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.base import role_maker
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main
class TestDygrapgHybridDP(TestCollectiveAPIRunnerBase):
def __init__(self):
pass
def check_pass(self, *args, **kwargs):
from common import init_parallel_env
import paddle
from paddle.distributed import fleet
hcg = init_parallel_env("DP16-MP1-PP1-SH1-O1", 2)
import numpy as np
dp_group = hcg.get_data_parallel_group()
np.random.seed(1024)
data = np.random.random((10 * dp_group.nranks, 100)).reshape(
(dp_group.nranks, -1, 100))
data_part = paddle.to_tensor(data[dp_group.rank])
paddle.distributed.collective.all_reduce(data_part)
data_reduced = data_part
data_sumed = np.sum(data, axis=0)
assert np.allclose(data_sumed,
data_reduced.numpy(),
rtol=1e-8,
atol=1e-8)
if __name__ == "__main__":
runtime_main(TestDygrapgHybridDP, "dp")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
from six import string_types
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.base import role_maker
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main
from paddle import nn
import numpy as np
def weight_init(mp, shape, col=True, seed=1024):
np.random.seed(seed)
w = np.random.normal(0, 0.02, size=shape)
if mp is None:
_w = w
else:
if col:
step = shape[1] // mp.nranks
_w = w[:, mp.rank * step:mp.rank * step + step]
else:
step = shape[0] // mp.nranks
_w = w[mp.rank * step:mp.rank * step + step, :]
return paddle.fluid.initializer.NumpyArrayInitializer(_w)
class Criterion(nn.Layer):
def __init__(self):
super(Criterion, self).__init__()
self.loss_func = nn.MSELoss(reduction="mean")
def forward(self, pred, label):
loss = self.loss_func(pred, label)
return loss
class ModelPipeline(fleet.meta_parallel.PipelineLayer):
def __init__(self, hcg):
paddle.seed(1024)
dp_linear = nn.Linear(32, 128)
self.layers_pp = []
self.topology = hcg.topology()
self.layers_pp.append(dp_linear)
mp = hcg.get_model_parallel_group()
for i in range(6):
if mp is not None and mp.nranks > 1:
mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear(
128,
512,
weight_attr=weight_init(mp, (128, 512), True, 1204 + i),
has_bias=True,
gather_output=False)
mp_linear_2 = fleet.meta_parallel.RowParallelLinear(
512,
128,
weight_attr=weight_init(mp, (512, 128), False, 2012 + i),
has_bias=True,
input_is_parallel=True)
else:
mp_linear_1 = nn.Linear(128,
512,
weight_attr=weight_init(
None, (128, 512), True, 1204 + i))
mp_linear_2 = nn.Linear(512,
128,
weight_attr=weight_init(
None, (512, 128), True, 2012 + i))
act = nn.ReLU6()
layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act)
self.layers_pp.append(layer_seq)
out = nn.Linear(128, 32)
self.layers_pp.append(out)
super(ModelPipeline, self).__init__(layers=self.layers_pp,
loss_fn=Criterion(),
topology=self.topology)
class Model(nn.Layer):
def __init__(self, hcg):
super(Model, self).__init__()
paddle.seed(1024)
dp_linear = nn.Linear(32, 128)
self.layers_pp = []
self.layers_pp.append(dp_linear)
mp = hcg.get_model_parallel_group() if hcg else None
for i in range(6):
if mp is not None and mp.nranks > 1:
mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear(
128,
512,
weight_attr=weight_init(mp, (128, 512), True, 1204 + i),
has_bias=True,
gather_output=False)
mp_linear_2 = fleet.meta_parallel.RowParallelLinear(
512,
128,
weight_attr=weight_init(mp, (512, 128), False, 2012 + i),
has_bias=True,
input_is_parallel=True)
else:
mp_linear_1 = nn.Linear(128,
512,
weight_attr=weight_init(
None, (128, 512), True, 1204 + i))
mp_linear_2 = nn.Linear(512,
128,
weight_attr=weight_init(
None, (512, 128), True, 2012 + i))
act = nn.ReLU6()
layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act)
self.layers_pp.append(layer_seq)
out = nn.Linear(128, 32)
self.layers_pp.append(out)
self.layers = nn.Sequential(*self.layers_pp)
def forward(self, x):
return self.layers(x)
class TestDygrapgHybridDPPPMP(TestCollectiveAPIRunnerBase):
def __init__(self):
pass
def check_pass(self, *args, **kwargs):
from common import init_parallel_env
import paddle
from paddle.distributed import fleet
hcg = init_parallel_env("DP4-MP2-PP2-SH1-O1", 64)
pp_degree = hcg.get_pipe_parallel_world_size()
import numpy as np
crit = Criterion()
if pp_degree <= 1:
model = Model(hcg)
else:
model = ModelPipeline(hcg)
model_base = Model(None)
optimizer = paddle.optimizer.Adam(learning_rate=0.01,
parameters=model.parameters())
optimizer_base = paddle.optimizer.Adam(
learning_rate=0.01, parameters=model_base.parameters())
model = fleet.distributed_model(model)
optimizer = fleet.distributed_optimizer(optimizer)
loss_hybrid_arr = []
loss_base_arr = []
x = paddle.to_tensor(np.random.random((16, 32))).astype("float32")
y = paddle.to_tensor(np.random.random((16, 32))).astype("float32")
for _ in range(5):
if pp_degree > 1:
loss = model.train_batch([x, y], optimizer=optimizer)
else:
output = model(x)
loss = crit(output, y)
loss.backward()
optimizer.step()
optimizer.clear_grad()
# baseline loss
output_base = model_base(x)
loss_base = crit(output_base, y)
loss_base.backward()
optimizer_base.step()
optimizer_base.clear_grad()
loss_base_arr.append(loss_base.numpy())
loss_hybrid_arr.append(loss.numpy())
assert np.allclose(loss_base_arr, loss_hybrid_arr, rtol=1e-5, atol=1e-5)
if __name__ == "__main__":
runtime_main(TestDygrapgHybridDPPPMP, "dpppmp")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
from six import string_types
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.base import role_maker
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main
from paddle import nn
import numpy as np
def weight_init(mp, shape, col=True, seed=1024):
np.random.seed(seed)
w = np.random.normal(0, 0.02, size=shape)
if mp is None:
_w = w
else:
if col:
step = shape[1] // mp.nranks
_w = w[:, mp.rank * step:mp.rank * step + step]
else:
step = shape[0] // mp.nranks
_w = w[mp.rank * step:mp.rank * step + step, :]
return paddle.fluid.initializer.NumpyArrayInitializer(_w)
class Criterion(nn.Layer):
def __init__(self):
super(Criterion, self).__init__()
self.loss_func = nn.MSELoss(reduction="mean")
def forward(self, pred, label):
loss = self.loss_func(pred, label)
return loss
class ModelPipeline(fleet.meta_parallel.PipelineLayer):
def __init__(self, hcg):
paddle.seed(1024)
dp_linear = nn.Linear(32, 128)
self.layers_pp = []
self.topology = hcg.topology()
self.layers_pp.append(dp_linear)
mp = hcg.get_model_parallel_group()
for i in range(6):
if mp is not None and mp.nranks > 1:
mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear(
128,
512,
weight_attr=weight_init(mp, (128, 512), True, 1204 + i),
has_bias=True,
gather_output=False)
mp_linear_2 = fleet.meta_parallel.RowParallelLinear(
512,
128,
weight_attr=weight_init(mp, (512, 128), False, 2012 + i),
has_bias=True,
input_is_parallel=True)
else:
mp_linear_1 = nn.Linear(128,
512,
weight_attr=weight_init(
None, (128, 512), True, 1204 + i))
mp_linear_2 = nn.Linear(512,
128,
weight_attr=weight_init(
None, (512, 128), True, 2012 + i))
act = nn.ReLU6()
layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act)
self.layers_pp.append(layer_seq)
out = nn.Linear(128, 32)
self.layers_pp.append(out)
super(ModelPipeline, self).__init__(layers=self.layers_pp,
loss_fn=Criterion(),
topology=self.topology)
class Model(nn.Layer):
def __init__(self, hcg):
super(Model, self).__init__()
paddle.seed(1024)
dp_linear = nn.Linear(32, 128)
self.layers_pp = []
self.layers_pp.append(dp_linear)
mp = hcg.get_model_parallel_group() if hcg else None
for i in range(6):
if mp is not None and mp.nranks > 1:
mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear(
128,
512,
weight_attr=weight_init(mp, (128, 512), True, 1204 + i),
has_bias=True,
gather_output=False)
mp_linear_2 = fleet.meta_parallel.RowParallelLinear(
512,
128,
weight_attr=weight_init(mp, (512, 128), False, 2012 + i),
has_bias=True,
input_is_parallel=True)
else:
mp_linear_1 = nn.Linear(128,
512,
weight_attr=weight_init(
None, (128, 512), True, 1204 + i))
mp_linear_2 = nn.Linear(512,
128,
weight_attr=weight_init(
None, (512, 128), True, 2012 + i))
act = nn.ReLU6()
layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act)
self.layers_pp.append(layer_seq)
out = nn.Linear(128, 32)
self.layers_pp.append(out)
self.layers = nn.Sequential(*self.layers_pp)
def forward(self, x):
return self.layers(x)
class TestDygraphHybridFp16(TestCollectiveAPIRunnerBase):
def __init__(self):
pass
def check_pass(self, *args, **kwargs):
from common import init_parallel_env
import paddle
from paddle.distributed import fleet
hcg = init_parallel_env("DP4-MP2-PP2-SH1-O1", 64)
pp_degree = hcg.get_pipe_parallel_world_size()
import numpy as np
crit = Criterion()
if pp_degree <= 1:
model = Model(hcg)
else:
model = ModelPipeline(hcg)
model_base = Model(None)
optimizer = paddle.optimizer.Adam(learning_rate=0.01,
parameters=model.parameters(),
multi_precision=True)
optimizer_base = paddle.optimizer.Adam(
learning_rate=0.01, parameters=model_base.parameters())
scaler = paddle.amp.GradScaler(init_loss_scaling=4096)
scaler = fleet.distributed_scaler(scaler)
model = paddle.amp.decorate(models=model,
level='O2',
save_dtype='float32')
model = fleet.distributed_model(model)
optimizer = fleet.distributed_optimizer(optimizer)
loss_hybrid_arr = []
loss_base_arr = []
x = paddle.to_tensor(np.random.random((16, 32))).astype("float32")
y = paddle.to_tensor(np.random.random((16, 32))).astype("float32")
for _ in range(2):
if pp_degree > 1:
with paddle.amp.auto_cast(True, level='O2'):
loss = model.train_batch([x, y],
optimizer=optimizer,
scaler=scaler)
else:
with paddle.amp.auto_cast(True, level='O2'):
output = model(x)
loss = crit(output, y)
scaler.scale(loss).backward()
scaler.minimize(optimizer, loss)
optimizer.clear_grad()
# baseline loss
with paddle.amp.auto_cast(True, level='O2'):
output_base = model_base(x)
loss_base = crit(output_base, y)
loss_base.backward()
optimizer_base.step()
optimizer_base.clear_grad()
loss_base_arr.append(loss_base.numpy())
loss_hybrid_arr.append(loss)
assert np.allclose(loss_base_arr, loss_hybrid_arr, rtol=1e-3, atol=1e-3)
if __name__ == "__main__":
runtime_main(TestDygraphHybridFp16, "dpppmp")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
from six import string_types
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.base import role_maker
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main
from paddle import nn
import numpy as np
from paddle.distributed.fleet.utils import recompute
def weight_init(mp, shape, col=True, seed=1024):
np.random.seed(seed)
w = np.random.normal(0, 0.02, size=shape)
if mp is None:
_w = w
else:
if col:
step = shape[1] // mp.nranks
_w = w[:, mp.rank * step:mp.rank * step + step]
else:
step = shape[0] // mp.nranks
_w = w[mp.rank * step:mp.rank * step + step, :]
return paddle.fluid.initializer.NumpyArrayInitializer(_w)
class Criterion(nn.Layer):
def __init__(self):
super(Criterion, self).__init__()
self.loss_func = nn.MSELoss(reduction="mean")
def forward(self, pred, label):
loss = self.loss_func(pred, label)
return loss
class RecomputeMatmulBlock(nn.Layer):
def __init__(self, mp, seed, m, n, k):
super(RecomputeMatmulBlock, self).__init__()
self.mp = mp
if mp is not None and mp.nranks > 1:
mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear(
m,
n,
weight_attr=weight_init(mp, (m, n), True, seed),
has_bias=True,
gather_output=False)
mp_linear_2 = fleet.meta_parallel.RowParallelLinear(
n,
k,
weight_attr=weight_init(mp, (n, k), False, seed + 1),
has_bias=True,
input_is_parallel=True)
else:
mp_linear_1 = nn.Linear(m,
n,
weight_attr=weight_init(
None, (m, n), True, seed))
mp_linear_2 = nn.Linear(n,
k,
weight_attr=weight_init(
None, (n, k), True, seed + 1))
self.layers = nn.Sequential(mp_linear_1, mp_linear_2)
def forward(self, x):
if self.mp:
return recompute(self.layers, x)
else:
return self.layers(x)
RecomputeBlock = RecomputeMatmulBlock
class ModelPipeline(fleet.meta_parallel.PipelineLayer):
def __init__(self, hcg):
paddle.seed(1024)
dp_linear = nn.Linear(32, 64)
self.layers_pp = []
self.topology = hcg.topology()
self.layers_pp.append(dp_linear)
mp = hcg.get_model_parallel_group()
for i in range(6):
mp_layer = RecomputeBlock(mp, 1024 + i, 64, 128, 64)
act = nn.ReLU6()
layer_seq = nn.Sequential(mp_layer, act)
self.layers_pp.append(layer_seq)
out = nn.Linear(64, 32)
self.layers_pp.append(out)
super(ModelPipeline, self).__init__(layers=self.layers_pp,
loss_fn=Criterion(),
topology=self.topology)
class Model(nn.Layer):
def __init__(self, hcg):
super(Model, self).__init__()
paddle.seed(1024)
dp_linear = nn.Linear(32, 64)
self.layers_pp = []
self.layers_pp.append(dp_linear)
mp = hcg.get_model_parallel_group() if hcg else None
for i in range(6):
mp_layer = RecomputeBlock(mp, 1024 + i, 64, 128, 64)
act = nn.ReLU6()
layer_seq = nn.Sequential(mp_layer, act)
self.layers_pp.append(layer_seq)
out = nn.Linear(64, 32)
self.layers_pp.append(out)
self.layers = nn.Sequential(*self.layers_pp)
def forward(self, x):
return self.layers(x)
class TestDygrapgHybridRecompute(TestCollectiveAPIRunnerBase):
def __init__(self):
pass
def check_pass(self, *args, **kwargs):
from common import init_parallel_env
import paddle
from paddle.distributed import fleet
hcg = init_parallel_env("DP4-MP2-PP2-SH1-O1", 64)
pp_degree = hcg.get_pipe_parallel_world_size()
import numpy as np
crit = Criterion()
if pp_degree <= 1:
model = Model(hcg)
else:
model = ModelPipeline(hcg)
model_base = Model(None)
optimizer = paddle.optimizer.Adam(learning_rate=0.01,
parameters=model.parameters())
optimizer_base = paddle.optimizer.Adam(
learning_rate=0.01, parameters=model_base.parameters())
model = fleet.distributed_model(model)
optimizer = fleet.distributed_optimizer(optimizer)
loss_hybrid_arr = []
loss_base_arr = []
x = paddle.to_tensor(np.random.random((16, 32))).astype("float32")
y = paddle.to_tensor(np.random.random((16, 32))).astype("float32")
for _ in range(5):
if pp_degree > 1:
loss = model.train_batch([x, y], optimizer=optimizer)
else:
output = model(x)
loss = crit(output, y)
loss.backward()
optimizer.step()
optimizer.clear_grad()
# baseline loss
output_base = model_base(x)
loss_base = crit(output_base, y)
loss_base.backward()
optimizer_base.step()
optimizer_base.clear_grad()
loss_base_arr.append(loss_base.numpy())
loss_hybrid_arr.append(loss)
assert np.allclose(loss_base_arr, loss_hybrid_arr, rtol=1e-5, atol=1e-5)
if __name__ == "__main__":
runtime_main(TestDygrapgHybridRecompute, "dpppmp")
# -*- coding: UTF-8 -*-
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import numpy as np
import argparse
import ast
import time
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler
epoch = 10
paddle.seed(2022)
np.random.seed(2022)
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, linear_size)
self._linear4 = Linear(linear_size, linear_size)
self._linear5 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
y = self._linear4(y)
y = self._linear5(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.Momentum(
parameters=[{
"params": list(model.parameters())
}] if opt_group else list(model.parameters()),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16)
return optimizer
def train_mlp(model,
sharding_stage,
use_pure_fp16=False,
accumulate_grad=False,
batch_size=100,
opt_group=False,
sync_comm=False,
test_minimize=False,
save_model=False):
group = paddle.distributed.new_group(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
if opt_group:
optimizer = optimizer_setting(model=model,
use_pure_fp16=use_pure_fp16,
opt_group=opt_group)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if use_pure_fp16:
model = paddle.amp.decorate(models=model,
level='O2',
save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
scaler = GroupShardedScaler(scaler)
if sharding_stage == 2:
optimizer = GroupShardedOptimizerStage2(
params=optimizer._parameter_list, optim=optimizer, group=group)
model = GroupShardedStage2(model,
optimizer,
group=group,
buffer_max_size=2**21)
elif sharding_stage == 3:
model = GroupShardedStage3(model,
optimizer=optimizer,
group=group,
sync_comm=sync_comm,
segment_size=2**15)
# check optimizer.minimize() error
if test_minimize:
try:
optimizer.minimize()
except:
print(
"====== Find sharding_stage3_optimizer.minimize() error ======")
return
train_reader = paddle.batch(reader_decorator(),
batch_size=batch_size,
drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(input=out,
label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
if not use_pure_fp16:
avg_loss.backward()
else:
scaler.scale(avg_loss).backward()
if not accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if sharding_stage == 3:
model.get_all_parameters()
if save_model:
return model, optimizer
return model.parameters()
def test_stage2_stage3():
paddle.distributed.init_parallel_env()
mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6, mlp7, mlp8, mlp9, mlp10 = MLP(
), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP()
state_dict = mlp.state_dict()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
mlp7.set_state_dict(state_dict)
mlp8.set_state_dict(state_dict)
mlp9.set_state_dict(state_dict)
mlp10.set_state_dict(state_dict)
# fp32
stage2_params = train_mlp(mlp1,
sharding_stage=2,
use_pure_fp16=False,
opt_group=False)
stage3_params = train_mlp(mlp2,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False)
for i in range(len(stage2_params)):
np.testing.assert_allclose(stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-6,
atol=1e-6)
# fp32 accumulate grad
stage3_params = train_mlp(mlp3,
sharding_stage=3,
use_pure_fp16=False,
accumulate_grad=True,
opt_group=True)
stage3_params_add = train_mlp(mlp4,
sharding_stage=3,
use_pure_fp16=False,
accumulate_grad=True,
batch_size=20,
opt_group=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(stage3_params[i].numpy(),
stage3_params_add[i].numpy(),
rtol=1e-6,
atol=1e-4)
# fp16
stage2_params = train_mlp(mlp5,
sharding_stage=2,
use_pure_fp16=True,
opt_group=False)
stage3_params = train_mlp(mlp6,
sharding_stage=3,
use_pure_fp16=True,
opt_group=False)
for i in range(len(stage2_params)):
np.testing.assert_allclose(stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-4,
atol=1e-3)
# fp16 sync_comm
stage3_params = train_mlp(mlp7,
sharding_stage=3,
use_pure_fp16=True,
opt_group=False)
stage3_params_re = train_mlp(mlp8,
sharding_stage=3,
use_pure_fp16=True,
opt_group=False,
sync_comm=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(stage3_params[i].numpy(),
stage3_params_re[i].numpy(),
rtol=1e-6)
# save/load model
output_dir = tempfile.mkdtemp()
try:
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage3, optimizer_stage3 = train_mlp(mlp9,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False,
save_model=True)
paddle.save(model_stage3.state_dict(), model_file)
paddle.save(optimizer_stage3.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage3.set_state_dict(m_state_dict)
optimizer_stage3.set_state_dict(opt_state_dict)
except Exception as e:
shutil.rmtree(output_dir)
raise e
else:
shutil.rmtree(output_dir)
# check optimizer.minimize() error
train_mlp(mlp10,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False,
test_minimize=True)
if __name__ == '__main__':
with _test_eager_guard():
test_stage2_stage3()
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import numpy as np
import argparse
import tempfile
import ast
import time
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2
seed = 2022
epoch = 2
linear_size = 1000
strategy = fleet.DistributedStrategy()
strategy.hybrid_configs = {
"dp_degree": 16,
"mp_degree": 1,
"pp_degree": 1,
"sharding_degree": 1
}
np.random.seed(seed)
paddle.seed(seed)
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, linear_size)
self._linear4 = Linear(linear_size, linear_size)
self._linear5 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
y = self._linear4(y)
y = self._linear5(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.AdamW(parameters=[{
"params": model.parameters()
}] if opt_group else model.parameters(),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16)
return optimizer
def train_mlp(model,
sharding_stage,
batch_size=100,
use_pure_fp16=False,
accumulate_grad=False,
opt_group=False,
save_model=False):
if sharding_stage == "dp":
hcg = fleet.get_hybrid_communicate_group()
group = hcg.get_check_parallel_group()
else:
group = paddle.distributed.new_group(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
if opt_group:
optimizer = optimizer_setting(model=model,
use_pure_fp16=use_pure_fp16,
opt_group=opt_group)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if sharding_stage == 2:
optimizer = ShardingOptimizerStage2(params=model.parameters(),
optim=optimizer,
group=group)
model = ShardingStage2(model,
optimizer,
group=group,
buffer_max_size=2**21)
else:
optimizer = fleet.distributed_optimizer(optimizer)
model = fleet.distributed_model(model)
train_reader = paddle.batch(reader_decorator(),
batch_size=batch_size,
drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
if sharding_stage == 2:
model.to(device="gpu")
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
out = model(img)
loss = paddle.nn.functional.cross_entropy(input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
avg_loss.backward()
if not accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if save_model:
return model, optimizer
return model.parameters()
def test_dp_stage2():
mlp = MLP()
state_dict = mlp.state_dict()
mlp1 = MLP()
mlp2 = MLP()
mlp3 = MLP()
mlp4 = MLP()
mlp5 = MLP()
mlp6 = MLP()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
# DP VS stage2
dp_params = train_mlp(mlp1,
sharding_stage="dp",
use_pure_fp16=False,
opt_group=False)
stage2_params = train_mlp(mlp2,
sharding_stage=2,
use_pure_fp16=False,
opt_group=False)
for i in range(len(dp_params)):
np.testing.assert_allclose(dp_params[i].numpy(),
stage2_params[i].numpy(),
rtol=1e-6,
atol=5e-4)
# stage2 accumulate grad
stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True)
stage2_accumulate_grad = train_mlp(mlp4,
sharding_stage=2,
batch_size=20,
accumulate_grad=True)
for i in range(len(stage2_params)):
np.testing.assert_allclose(stage2_params[i].numpy(),
stage2_accumulate_grad[i].numpy(),
rtol=1e-5,
atol=1e-5)
# stage2 param list VS param group
stage2_params = train_mlp(mlp5,
sharding_stage=2,
use_pure_fp16=False,
opt_group=True)
for i in range(len(dp_params)):
np.testing.assert_allclose(dp_params[i].numpy(),
stage2_params[i].numpy(),
rtol=1e-6,
atol=5e-4)
# save/load model
output_dir = tempfile.mkdtemp()
try:
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage2, optimizer_stage2 = train_mlp(mlp6,
sharding_stage=2,
use_pure_fp16=False,
opt_group=False,
save_model=True)
paddle.save(model_stage2.state_dict(), model_file)
paddle.save(optimizer_stage2.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage2.set_state_dict(m_state_dict)
optimizer_stage2.set_state_dict(opt_state_dict)
except Exception as e:
shutil.rmtree(output_dir)
raise e
else:
shutil.rmtree(output_dir)
if __name__ == '__main__':
with _test_eager_guard():
pass
fleet.init(is_collective=True, strategy=strategy)
test_dp_stage2()
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
unset https_proxy http_proxy
export FLAGS_rpc_disable_reuse_port=1
export MPIRUN=${EXE_MPIRUN:-""} # MPIRUN="mpirun"
name=${TEST_TARGET_NAME}
TEST_TIMEOUT=${TEST_TIMEOUT}
if [[ ${name}"x" == "x" ]]; then
echo "can't find ${name}, please set ${TEST_TARGET_NAME} first"
exit 1
fi
if [[ ${TEST_TIMEOUT}"x" == "x" ]]; then
echo "can't find ${TEST_TIMEOUT}, please set ${TEST_TIMEOUT} first"
exit 1
fi
# rm flag file
rm -f ${name}_*.log
# start the unit test
run_time=$(( $TEST_TIMEOUT - 10 ))
echo "run_time: ${run_time}"
if [[ ${WITH_COVERAGE} == "ON" ]]; then
PYTHON_EXEC="python3 -u -m coverage run --branch -p "
else
PYTHON_EXEC="python3 -u "
fi
unset PYTHONPATH
timeout -s SIGKILL ${run_time} ${MPIRUN} ${PYTHON_EXEC} ${name}.py > ${name}_run.log 2>&1
exit_code=$?
if [[ $exit_code -eq 0 ]]; then
exit 0
fi
echo "${name} faild with ${exit_code}"
echo "after run ${name}"
ps -aux
netstat -anlp
# paddle log
echo "${name} log"
for log in `ls ${name}_*.log`
do
printf "\ncat ${log}\n"
cat -n ${log}
done
# check CUDA or ROCM env
GPU_SYS_INFO_CMD=nvidia-smi
which ${GPU_SYS_INFO_CMD}
exit_code=$?
if [[ $exit_code -ne 0 ]]; then
GPU_SYS_INFO_CMD=rocm-smi
fi
which ${GPU_SYS_INFO_CMD}
exit_code=$?
if [[ $exit_code -ne 0 ]]; then
echo "nvidia-smi or rocm-smi faild with ${exit_code}"
exit ${exit_code}
fi
#display system context
for i in {1..2}; do
sleep 3
ps -aux
netstat -anlp
if hash "${GPU_SYS_INFO_CMD}" > /dev/null; then
${GPU_SYS_INFO_CMD}
fi
done
echo "dist space:"
df -h
#display /tmp/files
echo "ls /tmp/paddle.*"
ls -l /tmp/paddle.*
echo "ls -l ./"
ls -l ./
exit 1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import unittest
import time
import argparse
import os
import sys
import subprocess
import traceback
import functools
import pickle
import tempfile
from contextlib import closing
import paddle
import paddle.fluid as fluid
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import socket
class TestCollectiveAPIRunnerBase(object):
def check_pass(self, *args, **kwargs):
raise NotImplementedError(
"get model should be implemented by child class.")
def run_trainer(self, *args, **kwargs):
self.check_pass(*args, **kwargs)
def runtime_main(test_class, col_type=None):
args = {}
model = test_class()
args["static_mode"] = 0
model.run_trainer(**args)
class TestDistBase(unittest.TestCase):
def setUp(self):
self._trainers = 4
self._init_env()
def _init_env(self):
self._python_interp = sys.executable
self.temp_dir = tempfile.TemporaryDirectory()
def check_with_place(self,
model_file,
backend="nccl",
static_mode=False,
check_error_log=False,
need_envs={},
eager_mode=True,
args=[],
kwargs={}):
required_envs = {
"FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_eager_delete_tensor_gb": "0.0",
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
"FLAGS_call_stack_level": "2",
"GLOG_v": "0",
"NCCL_P2P_DISABLE": "1",
"PADDLE_WITH_GLOO": "0",
"BACKEND": backend,
"PADDLE_DISTRI_BACKEND": backend,
"PADDLE_USE_GPU": "1"
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "0"
required_envs["GLOG_logtostderr"] = "1"
required_envs["GLOO_LOG_LEVEL"] = "TRACE"
if eager_mode:
required_envs["FLAGS_enable_eager_mode"] = "%d" % 1
else:
required_envs["FLAGS_enable_eager_mode"] = "%d" % 0
self._run_cluster(model_file, required_envs)
def _run_cluster(self, model_file, envs):
run_cluster_process = f"{self._python_interp} -u -m paddle.distributed.launch --log_dir {self.temp_dir.name} {model_file}"
filted_envs = dict()
for k in envs.keys():
if "PADDLE_" == k[:7] and k not in [
"PADDLE_NNODES", "PADDLE_MASTER"
]:
continue
filted_envs[k] = envs[k]
launcher = subprocess.Popen(run_cluster_process.strip().split(),
stdout=sys.stderr,
stderr=sys.stdout,
env=filted_envs)
launcher.communicate(timeout=240)
if launcher.poll() is None:
self.temp_dir.cleanup()
raise TimeoutError
elif launcher.poll() != 0:
self.temp_dir.cleanup()
raise RuntimeError("test failed!")
self.temp_dir.cleanup()
#!/usr/bin/python3
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle
from test_collective_multi_nodes import TestDistBase
import os
class TestDYgraphDPMode(TestDistBase):
def setUp(self):
self._trainers = 16
self._init_env()
def test_col_parallel_linear(self):
self.check_with_place("dygraph_hybrid_dp.py",
backend="nccl",
need_envs=os.environ)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/python3
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle
from test_collective_multi_nodes import TestDistBase
import os
class TestDYgraphHybrid(TestDistBase):
def setUp(self):
self._trainers = 16
self._init_env()
def test_hybrid_dpppmp(self):
self.check_with_place("dygraph_hybrid_dpppmp.py",
backend="nccl",
need_envs=os.environ)
def test_hybrid_recompute(self):
self.check_with_place("dygraph_hybrid_recompute.py",
backend="nccl",
need_envs=os.environ)
def test_hybrid_fp16(self):
self.check_with_place("dygraph_hybrid_fp16.py",
backend="nccl",
need_envs=os.environ)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/python3
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle
from test_collective_multi_nodes import TestDistBase
import os
class TestDYgrapShardingDP(TestDistBase):
def setUp(self):
self._trainers = 16
self._init_env()
def test_hybrid_sharding_stage2(self):
self.check_with_place("mn_dygraph_sharding_stage2.py",
backend="nccl",
need_envs=os.environ)
def test_hybrid_sharding_stage3(self):
self.check_with_place("mn_dygraph_group_sharded_stage3.py",
backend="nccl",
need_envs=os.environ)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册