未验证 提交 9c17a899 编写于 作者: G guru4elephant 提交者: GitHub

upgrade collective fleet api (#18533)

* upgrade collective fleet api
上级 5e1220ef
......@@ -103,46 +103,6 @@ class RoleMakerBase(object):
return self._server_endpoints
class MultiProcessRoleMaker(RoleMakerBase):
"""
MultiProcessRoleMaker is a default role maker for multi-process
GPU training. It works with paddle.distributed.lanuch.py by-design
"""
def __init__(self):
super(MultiProcessRoleMaker, self).__init__()
self._role_is_generated = False
def generate_role(self):
import os
if not self._role_is_generated:
self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._num_trainers = 1
self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
assert (self._training_role == "TRAINER")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
if self._worker_endpoints:
self._worker_endpoints = self._worker_endpoints.split(",")
self._num_trainers = len(self._worker_endpoints)
self._role_is_generated = True
def is_worker(self):
return True
def is_server(self):
return False
def is_first_worker(self):
return self._current_id == 0
def worker_index(self):
return self._current_id
def worker_num(self):
return self._worker_num
class MPIRoleMaker(RoleMakerBase):
"""
MPIRoleMaker is a MPI-API based role maker which is a counter-part of K8SRoleMaker
......@@ -361,34 +321,47 @@ class MPISymetricRoleMaker(MPIRoleMaker):
class PaddleCloudRoleMaker(RoleMakerBase):
def __init__(self):
def __init__(self, is_collective=False):
super(PaddleCloudRoleMaker, self).__init__()
self._role_is_generated = False
self._is_collective = is_collective
def generate_role(self):
if not self._role_is_generated:
self.port = os.getenv("PADDLE_PORT", "6174")
self.pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in self.pserver_ips.split(","):
eplist.append(':'.join([ip, self.port]))
self.endpoints = ",".join(eplist)
self._trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
self.current_endpoint = os.getenv("POD_IP",
"localhost") + ":" + self.port
self.role = os.getenv("TRAINING_ROLE", "TRAINER")
self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self.eplist = eplist
print("PaddleCloudRoleMaker() endpoints: %s" % self.endpoints)
self.endpoints = self.endpoints.split(",")
self._server_endpoints = self.endpoints
self._worker_endpoints = self.endpoints
if self.role.upper() == "PSERVER":
self._current_id = self.endpoints.index(self.current_endpoint)
self._role = Role.SERVER
if not self._is_collective:
self.port = os.getenv("PADDLE_PORT", "6174")
self.pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in self.pserver_ips.split(","):
eplist.append(':'.join([ip, self.port]))
self.endpoints = ",".join(eplist)
self._trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
self.current_endpoint = os.getenv("POD_IP",
"localhost") + ":" + self.port
self.role = os.getenv("TRAINING_ROLE", "TRAINER")
self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self.eplist = eplist
self.endpoints = self.endpoints.split(",")
self._server_endpoints = self.endpoints
self._worker_endpoints = self.endpoints
if self.role.upper() == "PSERVER":
self._current_id = self.endpoints.index(
self.current_endpoint)
self._role = Role.SERVER
else:
self._current_id = self.trainer_id
self._role = Role.WORKER
else:
self._current_id = self.trainer_id
self._role = Role.WORKER
self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._training_role = os.getenv("PADDLE_TRAINING_ROLE",
"TRAINER")
assert (self._training_role == "TRAINER")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
if self._worker_endpoints:
self._worker_endpoints = self._worker_endpoints.split(",")
self._num_trainers = len(self._worker_endpoints)
self._role_is_generated = True
def is_worker(self):
......
......@@ -34,6 +34,7 @@ class DistributedStrategy(object):
self.h_allreduce = False
def build(self):
self.strategy_map = {}
# make sure we set single precision config True
if self.use_fp32 and self.use_fp16:
self.use_fp16 = False
......@@ -48,75 +49,19 @@ class DistributedStrategy(object):
class DistributedOptimizerFactory(object):
def strategy_to_optimizer_map(self):
pattern = {}
pattern["fp16"] = [
"MixedPrecisionOptimizer", "MixedPrecisionLocalSGDOptimizer"
]
pattern["fp32"] = ["FullPrecisionOptimizer", "LocalSGDOptimizer"]
pattern["localsgd"] = [
"MixedPrecisionLocalSGDOptimizer", "LocalSGDOptimizer"
]
pattern["h_allreduce"] = [
"FullPrecisionOptimizer",
"LocalSGDOptimizer",
"MixedPrecisionOptimizer",
"MixedPrecisionLocalSGDOptimizer",
]
self.pattern = pattern
def create_by_strategy(self, optimizer, strategy):
if strategy == None:
strategy = DistributedStrategy()
strategy.build()
strategy_list = []
for key in strategy.strategy_map:
if strategy.strategy_map[key]:
strategy_list.append(self.pattern[key])
classname = list(set.intersection(*map(set, strategy_list)))[0]
return globals()[classname](optimizer, strategy)
class DistributedStrategy(object):
def __init__(self):
# precision configs
self.use_fp16 = False
self.use_fp32 = True
# algorithmic communication
self.local_sgd = False
self.dgc = False
# communication topology configs
self.h_allreduce = False
self.strategy_to_optimizer_map()
def build(self):
# make sure we set single precision config True
if self.use_fp32 and self.use_fp16:
self.use_fp16 = False
# make sure we set single algorithmic communication True
if self.local_sgd and self.dgc:
self.local_sgd = False
self.strategy_map["fp16"] = self.use_fp16
self.strategy_map["fp32"] = self.use_fp32
self.strategy_map["localsgd"] = self.local_sgd
self.strategy_map["dgc"] = self.dgc
self.strategy_map["h_allreduce"] = self.h_allreduce
class DistributedOptimizerFactory(object):
def strategy_to_optimizer_map(self):
pattern = {}
pattern["fp16"] = [
"MixedPrecisionOptimizer", "MixedPrecisionLocalSGDOptimizer"
]
pattern["fp32"] = ["FullPrecisionOptimizer", "LocalSGDOptimizer"]
pattern["localsgd"] = [
"MixedPrecisionLocalSGDOptimizer", "LocalSGDOptimizer"
]
pattern["fp16"] = ["FP16SGDOptimizer", "FP16LocalSGDOptimizer"]
pattern["fp32"] = ["FP32SGDOptimizer", "FP32LocalSGDOptimizer"]
pattern["localsgd"] = ["FP16LocalSGDOptimizer", "FP32LocalSGDOptimizer"]
pattern["h_allreduce"] = [
"FullPrecisionOptimizer",
"LocalSGDOptimizer",
"MixedPrecisionOptimizer",
"MixedPrecisionLocalSGDOptimizer",
"FP32SGDOptimizer",
"FP32LocalSGDOptimizer",
"FP16SGDOptimizer",
"FP16LocalSGDOptimizer",
]
self.pattern = pattern
......@@ -158,8 +103,10 @@ class Collective(Fleet):
"You should not call 'stop_worker' method for collective mode.")
def distributed_optimizer(self, optimizer, strategy=None):
optimizer_factory = DistributedOptimizerFactory()
self._optimizer = \
DistributedOptimizerFactory.create_by_strategy(optimizer, strategy)
optimizer_factory.create_by_strategy(optimizer, strategy)
return self._optimizer
def save_inference_model(self,
......@@ -182,29 +129,13 @@ fleet = Collective()
class CollectiveOpBasedOptimizer(DistributedOptimizer):
"""
TBA
Collective Operator Base Class For Distributed Optimizer
The class is invisible to a user
"""
def __init__(self, optimizer, strategy=None):
super(CollectiveOpBasedOptimizer, self).__init__(optimizer, strategy)
def _transpile_program(self, startup_program=None):
startup_program = startup_program if startup_program else \
fluid.framework.default_startup_program()
worker_endpoints = fleet.worker_endpoints()
trainer_id = fleet.worker_index()
current_endpoint = fleet.worker_endpoints()[trainer_id]
# call transpiler
config = dist_transpiler.DistributeTranspilerConfig()
config.mode = "collective"
config.collective_mode = "sgd"
t = dist_transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
trainers=','.join(worker_endpoints),
startup_program=startup_program,
current_endpoint=current_endpoint)
def backward(self,
loss,
startup_program=None,
......@@ -218,11 +149,14 @@ class CollectiveOpBasedOptimizer(DistributedOptimizer):
return self._optimizer.apply_gradients(params_grads)
class MixedPrecisionOptimizer(CollectiveOpBasedOptimizer):
class FP16SGDOptimizer(CollectiveOpBasedOptimizer):
"""
TBA
do all reduce within every minibatch
"""
def __init__(self, optimizer, strategy=None):
super(FP16SGDOptimizer, self).__init__(optimizer, strategy)
def minimize(self,
loss,
startup_program=None,
......@@ -231,32 +165,51 @@ class MixedPrecisionOptimizer(CollectiveOpBasedOptimizer):
pass
class FullPrecisionOptimizer(CollectiveOpBasedOptimizer):
"""
TBA
"""
class FP32LocalSGDOptimizer(CollectiveOpBasedOptimizer):
def __init__(self, optimizer, strategy=None):
super(FP32LocalSGDOptimizer, self).__init__(optimizer, strategy)
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
opts, param_and_grads = self._optimizer.minimize(loss)
config = fluid.DistributeTranspilerConfig()
config.mode = 'collective'
config.collective_mode = 'local_sgd'
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id=fleet.worker_index(),
trainers=fleet.worker_endpoints(),
current_endpoint=fleet.worker_endpoints()[fleet.worker_index()],
startup_program=startup_program,
program=loss.block.program)
return opts, param_and_grads
class FP32SGDOptimizer(CollectiveOpBasedOptimizer):
def __init__(self, optimizer, strategy=None):
super(FullPrecisionOptimizer, self).__init__(optimizer, strategy)
super(FP32SGDOptimizer, self).__init__(optimizer, strategy)
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
self._transpile_program(startup_program)
train_program = loss.block.program
param_grads = self.backward(loss)
train_program.global_block().append_op(type='c_sync_compute_stream')
data_parallel_param_grads = []
for p, g in param_grads:
# NOTE: scale will be done on loss scale
# in multi_devices_graph_pass using nranks.
reduced_g = fluid.layers.collective._allreduce(g, g)
data_parallel_param_grads.append([p, reduced_g])
train_program.global_block().append_op(type='c_sync_comm_stream')
self.apply_gradients(data_parallel_param_grads)
opts, param_and_grads = self._optimizer.minimize(loss)
config = fluid.DistributeTranspilerConfig()
config.mode = 'collective'
config.collective_mode = 'grad_allreduce'
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id=fleet.worker_index(),
trainers=fleet.worker_endpoints(),
current_endpoint=fleet.worker_endpoints()[fleet.worker_index()],
startup_program=startup_program,
program=loss.block.program)
return opts, param_and_grads
class CollectiveOptimizer(DistributedOptimizer):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册