未验证 提交 4920c474 编写于 作者: S ShenLiang 提交者: GitHub

[HybridParallel]Fix pipeline in dygraph (#33007)

* fix pipeline

* fix mp pp dp

* fix utest of hybrid parallel

* add utest for tuple
上级 6ad5ece5
......@@ -253,3 +253,8 @@ class HybridCommunicateGroup(object):
# check parallel group
def get_check_parallel_group(self):
return self._check_comm_group
def get_rank_from_stage(self, stage_id):
coord = self._topo.get_coord(self.global_rank)
tf = coord._replace(pipe=stage_id)._asdict()
return self._topo.get_rank(**tf)
......@@ -89,12 +89,14 @@ class HybridParallelOptimizer:
self._inner_opt = optimizer
self._strategy = strategy
self._hcg = hcg
self._is_mp = (
self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL)
self._use_dp_mode = (
self._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL)
self._need_dp = (self._hcg.get_data_parallel_world_size() > 1)
if isinstance(self._inner_opt._grad_clip,
ClipGradByGlobalNorm) and self._is_mp:
ClipGradByGlobalNorm) and not self._use_dp_mode:
logger.warning("using ClipGradByGlobalNorm in TensorParallel, the origin " \
"optmizer'grad clip will be changed.")
self._inner_opt._grad_clip = HybridParallelClipGrad(
......@@ -103,7 +105,7 @@ class HybridParallelOptimizer:
@imperative_base.no_grad
@framework.dygraph_only
def step(self):
if self._is_mp and self._need_dp:
if not self._use_dp_mode and self._need_dp:
fused_allreduce_gradients(
list(self._inner_opt._parameter_list), self._hcg)
self._inner_opt.step()
......@@ -119,7 +121,7 @@ class HybridParallelOptimizer:
parameter_list = parameters if parameters \
else self._parameter_list
if self._is_mp and self._need_dp:
if not self._use_dp_mode and self._need_dp:
fused_allreduce_gradients(list(parameter_list), self._hcg)
return self._inner_opt.minimize(loss, startup_program, parameters,
......
......@@ -14,20 +14,51 @@
import abc
import paddle
from ...utils import hybrid_parallel_util as hp_util
from ...utils import log_util as hp_util
__all__ = []
FLOAT_TYPES = [
paddle.float16,
paddle.float32,
paddle.float64,
]
FLOAT_TYPE_DICT = {
paddle.float16: "float16",
paddle.float32: "float32",
paddle.float64: "float64",
}
PADDLE_TO_NUMBER = {
paddle.float16: 0,
paddle.float32: 1,
paddle.float64: 2,
paddle.int32: 3,
paddle.int64: 4
}
NUMBER_TO_DTYPE = {
0: "float16",
1: "float32",
2: "float64",
3: "int32",
4: "int64"
}
def is_float_tensor(tensor):
"""Is a float tensor"""
return tensor.dtype in FLOAT_TYPES
return tensor.dtype in FLOAT_TYPE_DICT.keys()
def get_tensor_dtype(dtype):
assert dtype in FLOAT_TYPE_DICT.keys()
return FLOAT_TYPE_DICT[dtype]
def paddle_2_number(dtype):
assert dtype in PADDLE_TO_NUMBER.keys()
return PADDLE_TO_NUMBER[dtype]
def number_2_dtype(number):
assert number in NUMBER_TO_DTYPE.keys()
return NUMBER_TO_DTYPE[number]
def get_tensor_bytes(tensor):
......@@ -48,78 +79,3 @@ def get_tensor_bytes(tensor):
else:
raise ValueError("unknown data type: {}".format(tensor.dtype))
return tensor.numel() * elem_size
class Generator():
def __init__(self, micro_batches, stages, stage_id):
__metaclass__ = abc.ABCMeta
self.micro_batches = micro_batches
self.stages = stages
self.stage_id = stage_id
self.prev_stage = self.stage_id - 1
self.next_stage = self.stage_id + 1
@abc.abstractmethod
def generate(self):
pass
def __iter__(self):
self.iter = None
return self
def __next__(self):
if self.iter is None:
self.iter = self.generate()
return next(self.iter)
class TrainGenerator(Generator):
def generate(self):
startup_steps = self.stages - self.stage_id - 1
cmds = []
forward_steps = 0
backward_steps = 0
#while (forward_steps < startup_steps):
# cmds.append(Forward(cache_id=forward_steps))
# forward_steps += 1
#while (forward_steps < self.micro_batches):
# cmds.append(Forward(cache_id=forward_steps))
# forward_steps += 1
# cmds.append(Backward(cache_id=backward_steps))
# backward_steps += 1
#while (backward_steps < self.micro_batches):
# cmds.append(Backward(cache_id=backward_steps))
# backward_steps += 1
#cmds.append(Optimize())
while (forward_steps < self.micro_batches):
cmds.append(Forward(cache_id=forward_steps))
forward_steps += 1
while (backward_steps < self.micro_batches):
cmds.append(Backward(cache_id=backward_steps))
backward_steps += 1
cmds.append(Optimize())
yield cmds
class Command:
def __init__(self, **kwargs):
self.name = self.__class__.__name__
self.kwargs = kwargs
for key, val in kwargs.items():
setattr(self, key, val)
def __repr__(self):
return hp_util.call_to_str(self.name, **self.kwargs)
class Optimize(Command):
pass
class Forward(Command):
pass
class Backward(Command):
pass
......@@ -23,7 +23,8 @@ list(APPEND DIST_TEST_OPS test_gen_nccl_id_op)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_layer)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_parallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_tensor_parallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_mp_layers)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
#remove distribute unittests.
......@@ -179,7 +180,8 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM))
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_layer)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_parallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_tensor_parallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_mp_layers)
LIST(REMOVE_ITEM TEST_OPS test_imperative_auto_mixed_precision)
LIST(REMOVE_ITEM TEST_OPS test_fleet_base_single)
......@@ -558,7 +560,7 @@ if(WITH_DISTRIBUTE)
set(dist_ut_port 20001)
foreach(TEST_OP ${DIST_TEST_OPS})
bash_test_modules(${TEST_OP} START_BASH dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}")
MATH(EXPR dist_ut_port "${dist_ut_port}+40")
MATH(EXPR dist_ut_port "${dist_ut_port}+35")
if(dist_ut_port GREATER_EQUAL 22998)
message(FATAL_ERROR "available ports have been exhausted:${dist_ut_port}")
endif()
......@@ -866,7 +868,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
set_tests_properties(test_parallel_dygraph_dataparallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_pipeline_layer PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_pipeline_parallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_mp_layers PROPERTIES TIMEOUT 120)
if(${NCCL_VERSION} VERSION_GREATER_EQUAL 2212)
set_tests_properties(test_parallel_dygraph_sparse_embedding PROPERTIES TIMEOUT 120)
......
......@@ -37,6 +37,7 @@ hidden_size = 10
inner_size = 8
output_size = 2
seq_length = 2
batch_size = 4
class SimpleMPNet(fluid.dygraph.Layer):
......@@ -130,18 +131,6 @@ class SimpleDPNet(fluid.dygraph.Layer):
return x
class TrainDataset(Dataset):
def __init__(self, length):
self.length = length
def __len__(self):
return self.length
def __getitem__(self, index):
np_input_data = np.random.randint(0, vocab_size, (seq_length, ))
return np_input_data
class TestDistMPTraning(unittest.TestCase):
def setUp(self):
strategy = fleet.DistributedStrategy()
......@@ -178,20 +167,6 @@ class TestDistMPTraning(unittest.TestCase):
np_fc1 = np.random.random_sample((hidden_size, inner_size))
np_fc2 = np.random.random_sample((inner_size, hidden_size))
train_data = TrainDataset(length=10000)
train_batch_sampler = paddle.io.DistributedBatchSampler(
train_data,
batch_size=4,
shuffle=False,
num_replicas=self.data_parallel_size,
rank=dp_id)
train_data_loader = DataLoader(
dataset=train_data,
batch_sampler=train_batch_sampler,
num_workers=0,
return_list=True)
model_a = SimpleMPNet(vocab_size, hidden_size, inner_size, output_size,
np_fc1, np_fc2, mp_id)
optimizer_a = self.build_optimizer(model_a)
......@@ -202,16 +177,17 @@ class TestDistMPTraning(unittest.TestCase):
np_fc1, np_fc2)
optimizer_b = self.build_optimizer(model_b)
return model_a, optimizer_a, model_b, optimizer_b, train_data_loader
return model_a, optimizer_a, model_b, optimizer_b
def test_mp_model(self):
model_a, optimizer_a, model_b, optimizer_b, train_data_loader = self.build_model_optimizer(
model_a, optimizer_a, model_b, optimizer_b = self.build_model_optimizer(
)
for step, batch in enumerate(train_data_loader):
if step > 5:
return
for _ in range(5):
np_data = np.random.randint(0, vocab_size, (
batch_size,
seq_length, ))
batch = paddle.to_tensor(np_data)
loss_a = self.train_batch(batch, model_a, optimizer_a, True)
loss_b = self.train_batch(batch, model_b, optimizer_b, False)
......
......@@ -15,39 +15,25 @@
from __future__ import division
from __future__ import print_function
import unittest
import paddle
import numpy as np
import random
import paddle
import paddle.distributed as dist
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
from paddle.io import DataLoader, Dataset
import unittest
from hybrid_parallel_pp_layer import AlexNetPipeDesc, AlexNet
def set_random_seed(seed, dp_id, rank_id):
"""Set random seed for reproducability."""
random.seed(seed)
np.random.seed(seed + dp_id)
paddle.seed(seed + rank_id)
paddle.seed(seed + dp_id)
HIDDEN_DIM = 32
LAYERS = 8
def sequential_model():
model = paddle.nn.Sequential(
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
paddle.nn.Linear(HIDDEN_DIM, 1), )
return model
batch_size = 4
micro_batch_size = 2
class TestDistPPTraning(unittest.TestCase):
......@@ -61,32 +47,73 @@ class TestDistPPTraning(unittest.TestCase):
"mp_degree": self.model_parallel_size,
"pp_degree": self.pipeline_parallel_size,
}
strategy.pipeline_configs = {"accumulate_steps": 2}
paddle.distributed.init_parallel_env()
strategy.pipeline_configs = {
"accumulate_steps": batch_size // micro_batch_size,
"micro_batch_size": micro_batch_size
}
fleet.init(is_collective=True, strategy=strategy)
def test_mp_model(self):
batch_input = paddle.randn(shape=(1, HIDDEN_DIM), dtype="float32")
pipe_model = sequential_model()
sgd = paddle.optimizer.SGD(learning_rate=0.0003, parameters=[])
pipe_model = paddle.distributed.fleet.distributed_model(pipe_model)
if pipe_model.stage_id == 0 or pipe_model.stage_id == 1:
pipe_input = batch_input.clone().detach()
pipe_input = paddle.cast(pipe_input, 'float32')
def data_gen():
gen = True
while gen:
yield [pipe_input, 0]
gen = False
loader = paddle.io.DataLoader.from_generator(capacity=5)
loader.set_batch_generator(data_gen)
data_iter = iter(loader)
else:
data_iter = None
return True
def test_pp_model(self):
hcg = fleet.get_hybrid_communicate_group()
word_size = hcg.get_model_parallel_world_size()
dp_id = hcg.get_data_parallel_rank()
pp_id = hcg.get_stage_id()
rank_id = dist.get_rank()
set_random_seed(1024, dp_id, rank_id)
#construct model a
model_a = AlexNet(10)
scheduler_a = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2], values=[0.001, 0.002], verbose=True)
optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a,
parameters=model_a.parameters())
param_len = len(model_a.parameters())
parameters = []
for param in model_a.parameters():
parameters.append(param.numpy())
# construct model b
model_b = AlexNetPipeDesc(num_stages=self.pipeline_parallel_size)
scheduler_b = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2], values=[0.001, 0.002], verbose=True)
optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b,
parameters=model_b.parameters())
model_b = fleet.distributed_model(model_b)
optimizer_b = fleet.distributed_optimizer(optimizer_b)
for idx, param in enumerate(model_b.parameters()):
param.set_value(parameters[idx + pp_id * (param_len // 2)])
# construct reader
train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=batch_size, drop_last=True)
for step_id, data in enumerate(train_reader()):
x_data = np.array([x[0] for x in data]).astype('float32').reshape(
batch_size, 1, 28, 28)
y_data = np.array([x[1] for x in data]).astype('int64').reshape(
batch_size, 1)
img = paddle.to_tensor(x_data)
label = paddle.to_tensor(y_data)
img.stop_gradient = True
label.stop_gradient = True
if step_id >= 5:
return True
loss_a = model_a(img, label)
loss_a.backward()
optimizer_a.step()
optimizer_a.clear_grad()
scheduler_a.step()
loss_b = model_b.train_batch([img, label], optimizer_b, scheduler_b)
print("loss: ", loss_a.numpy(), loss_b.numpy())
np.testing.assert_allclose(
loss_a.numpy(), loss_b.numpy(), rtol=1e-5)
if __name__ == "__main__":
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import unittest
import paddle
import numpy as np
import random
import paddle
import paddle.distributed as dist
import paddle.distributed.fleet as fleet
from paddle.fluid.dygraph.container import Sequential
from paddle.distributed.fleet.meta_parallel import PipelineLayer
from paddle.fluid.dygraph.layers import Layer
import paddle.nn as nn
import paddle.fluid as fluid
def set_random_seed(seed, dp_id, rank_id):
"""Set random seed for reproducability."""
random.seed(seed)
np.random.seed(seed + dp_id)
paddle.seed(seed + dp_id)
batch_size = 16
micro_batch_size = 4
vocab_size = 128
hidden_size = 8
class SimpleNet(Layer):
def __init__(self):
super(SimpleNet, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
self.softmax_weight = self.create_parameter(
shape=[hidden_size, vocab_size])
self.softmax_bias = self.create_parameter(
shape=[vocab_size], is_bias=False)
def forward(self, x1, x2, y1):
x_emb = self.word_embeddings(x1)
fc = fluid.layers.matmul(x_emb, self.softmax_weight)
fc = fluid.layers.elementwise_add(fc, self.softmax_bias)
projection = fluid.layers.reshape(fc, shape=[-1, vocab_size])
loss = fluid.layers.softmax_with_cross_entropy(
logits=projection, label=y1, soft_label=False)
return loss.mean()
class EmbeddingNet(Layer):
def __init__(self):
super(EmbeddingNet, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
def forward(self, args):
x1, x2 = args
x_emb = self.word_embeddings(x1)
return x_emb, x2
class MatmulNet(Layer):
def __init__(self):
super(MatmulNet, self).__init__()
self.softmax_weight = self.create_parameter(
shape=[hidden_size, vocab_size])
def forward(self, args):
x1, x2 = args
fc = fluid.layers.matmul(x1, self.softmax_weight)
return fc, x2
class BiasNet(Layer):
def __init__(self):
super(BiasNet, self).__init__()
self.softmax_bias = self.create_parameter(shape=[vocab_size])
def forward(self, args):
fc, x2 = args
fc = fluid.layers.elementwise_add(fc, self.softmax_bias)
projection = fluid.layers.reshape(fc, shape=[-1, vocab_size])
return projection, x2
class LossNet(Layer):
def __init__(self):
super(LossNet, self).__init__()
def forward(self, args, y1):
projection, x2 = args
loss = fluid.layers.softmax_with_cross_entropy(
logits=projection, label=y1[0], soft_label=False)
return loss.mean()
class SimpleNetPipe(Layer):
def __init__(self):
super(SimpleNetPipe, self).__init__()
self.features = Sequential(EmbeddingNet(), MatmulNet(), BiasNet())
def to_layers(self):
feat = [self.features[i] for i in range(len(self.features))]
return feat
class TestDistEmbeddingTraning(unittest.TestCase):
def setUp(self):
strategy = fleet.DistributedStrategy()
self.model_parallel_size = 1
self.data_parallel_size = 1
self.pipeline_parallel_size = 2
strategy.hybrid_configs = {
"dp_degree": self.data_parallel_size,
"mp_degree": self.model_parallel_size,
"pp_degree": self.pipeline_parallel_size,
}
strategy.pipeline_configs = {
"accumulate_steps": batch_size // micro_batch_size,
"micro_batch_size": micro_batch_size
}
fleet.init(is_collective=True, strategy=strategy)
def test_pp_model(self):
hcg = fleet.get_hybrid_communicate_group()
word_size = hcg.get_model_parallel_world_size()
dp_id = hcg.get_data_parallel_rank()
pp_id = hcg.get_stage_id()
rank_id = dist.get_rank()
set_random_seed(1024, dp_id, rank_id)
#construct model a
model_a = SimpleNet()
scheduler_a = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True)
optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a,
parameters=model_a.parameters())
init_net = SimpleNetPipe()
model_b = PipelineLayer(
layers=init_net.to_layers(),
num_stages=self.pipeline_parallel_size,
loss_fn=LossNet())
scheduler_b = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True)
optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b,
parameters=model_b.parameters())
model_b = fleet.distributed_model(model_b)
optimizer_b = fleet.distributed_optimizer(optimizer_b)
param_len = len(model_a.parameters())
parameters = []
for param in model_a.parameters():
print(param.name, param.shape)
parameters.append(param.numpy())
model_b_params = model_b.parameters()
if pp_id == 0:
model_b_params[0].set_value(parameters[2])
else:
model_b_params[0].set_value(parameters[0])
model_b_params[1].set_value(parameters[1])
for step in range(5):
x1_data = np.random.randint(0, vocab_size, size=[batch_size, 1])
x2_data = np.random.randint(0, vocab_size, size=[batch_size, 1])
y1_data = np.random.randint(0, 10, size=[batch_size, 1])
x1 = paddle.to_tensor(x1_data)
x2 = paddle.to_tensor(x2_data)
y1 = paddle.to_tensor(y1_data)
x1.stop_gradient = True
x2.stop_gradient = True
y1.stop_gradient = True
loss_a = model_a(x1, x2, y1)
loss_a.backward()
optimizer_a.step()
optimizer_a.clear_grad()
scheduler_a.step()
loss_b = model_b.train_batch([(x1, x2), (y1, )], optimizer_b,
scheduler_b)
print("loss", loss_a.numpy(), loss_b.numpy())
np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy())
if __name__ == "__main__":
unittest.main()
......@@ -12,17 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import os
import paddle
from paddle.distributed import fleet
import copy
from paddle.fluid.dygraph.container import Sequential
import paddle.nn as nn
from paddle.fluid.dygraph.layers import Layer
from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer
import paddle.nn.functional as F
import unittest
class ReshapeHelp(Layer):
def __init__(self, shape):
super(ReshapeHelp, self).__init__()
self.shape = shape
def forward(self, x):
return x.reshape(shape=self.shape)
class AlexNet(Layer):
......@@ -30,7 +38,7 @@ class AlexNet(Layer):
super(AlexNet, self).__init__()
self.features = Sequential(
nn.Conv2D(
3, 64, kernel_size=11, stride=4, padding=5),
1, 64, kernel_size=11, stride=4, padding=5),
nn.ReLU(),
nn.MaxPool2D(
kernel_size=2, stride=2),
......@@ -50,13 +58,14 @@ class AlexNet(Layer):
nn.ReLU(),
nn.MaxPool2D(
kernel_size=2, stride=2), )
self.reshape_layer = ReshapeHelp(shape=[-1, 256])
self.classifier = nn.Linear(256, num_classes)
self.loss_fn = nn.loss.CrossEntropyLoss()
def forward(self, x, y):
x = self.features(x)
x.flatten()
x = self.reshape_layer(x)
x = self.classifier(x)
return self.loss_fn(x, y)
......@@ -64,7 +73,7 @@ class AlexNet(Layer):
class AlexNetPipe(AlexNet):
def to_layers(self):
feat = [self.features[i] for i in range(len(self.features))]
loss_fn = [lambda x: x.flatten(), self.classifier]
loss_fn = [self.reshape_layer, self.classifier]
feat.extend(loss_fn)
return feat
......@@ -74,7 +83,7 @@ class AlexNetPipeDesc(PipelineLayer):
self.num_classes = num_classes
decs = [
LayerDesc(
nn.Conv2D, 3, 64, kernel_size=11, stride=4, padding=5),
nn.Conv2D, 1, 64, kernel_size=11, stride=4, padding=5),
LayerDesc(nn.ReLU),
LayerDesc(
nn.MaxPool2D, kernel_size=2, stride=2),
......@@ -94,7 +103,8 @@ class AlexNetPipeDesc(PipelineLayer):
F.relu,
LayerDesc(
nn.MaxPool2D, kernel_size=2, stride=2),
lambda x: x.flatten(),
LayerDesc(
ReshapeHelp, shape=[-1, 256]),
LayerDesc(nn.Linear, 256, self.num_classes), # classifier
]
super(AlexNetPipeDesc, self).__init__(
......@@ -104,24 +114,24 @@ class AlexNetPipeDesc(PipelineLayer):
class TestPipeLayerAPI(unittest.TestCase):
def setUp(self):
strategy = fleet.DistributedStrategy()
self.model_parallel_size = 2
self.pipeline_parallel_size = 2
strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": 1,
"pp_degree": self.model_parallel_size
"pp_degree": self.pipeline_parallel_size
}
fleet.init(is_collective=True, strategy=strategy)
self.hcg = fleet.get_hybrid_communicate_group()
def test_pipelayer_desc(self):
pipe_model = AlexNetPipeDesc(num_stages=self.model_parallel_size)
pipe_model = AlexNetPipeDesc(num_stages=self.pipeline_parallel_size)
np.testing.assert_array_equal(len(pipe_model.parameters()), 6)
def test_pipelayer_sequential(self):
init_net = AlexNetPipe()
pipe_model = PipelineLayer(
layers=init_net.to_layers(),
num_stages=self.model_parallel_size,
num_stages=self.pipeline_parallel_size,
loss_fn=nn.CrossEntropyLoss())
stage_id = self.hcg.get_stage_id()
init_parameters = init_net.parameters()
......
......@@ -17,8 +17,11 @@ from __future__ import print_function
import unittest
import time
import paddle.fluid as fluid
import copy
import os
import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, start_local_trainers
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus):
......@@ -46,6 +49,55 @@ def get_gpus(selected_gpus):
return selected_gpus
def start_local_trainers(cluster,
pod,
training_script,
training_script_args,
log_dir=None):
current_env = copy.copy(os.environ.copy())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
for t in pod.trainers:
proc_env = {
"FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
"PADDLE_TRAINER_ID": "%d" % t.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
}
current_env.update(proc_env)
print("trainer proc env:{}".format(current_env))
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
cmd = "python -m coverage run --branch -p " + training_script
else:
cmd = "python -u " + training_script
print("start trainer proc:{} env:{}".format(cmd, proc_env))
fn = None
proc = subprocess.Popen(cmd.split(" "), env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = t.rank
tp.log_fn = fn
tp.cmd = cmd
procs.append(tp)
return procs
class TestMultipleGpus(unittest.TestCase):
def run_mnist_2gpu(self, target_file_name):
if not fluid.core.is_compiled_with_cuda(
......
......@@ -24,6 +24,9 @@ class TestHybridPipeParallel(TestMultipleGpus):
def test_hybrid_parallel_pp_layer(self):
self.run_mnist_2gpu('hybrid_parallel_pp_layer.py')
def test_hybrid_parallel_pp_tuple_inputs(self):
self.run_mnist_2gpu('hybrid_parallel_pp_embedding.py')
if __name__ == "__main__":
unittest.main()
......@@ -22,7 +22,7 @@ from test_parallel_dygraph_dataparallel import TestMultipleGpus
class TestPipelineParallel(TestMultipleGpus):
def test_pipeline_parallel(self):
self.run_mnist_2gpu('hybrid_parallel_pp_model.py')
self.run_mnist_2gpu('hybrid_parallel_pp_alexnet.py')
if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册