未验证 提交 a96d54ac 编写于 作者: D Dong Daxiang 提交者: GitHub

Generate final strategy (#25782)

* refine strategy compiler and meta optimizers
make async as a_sync
上级 0ca1bb56
......@@ -85,7 +85,7 @@ message DistributedStrategy {
optional bool pipeline = 9 [ default = false ];
optional bool elastic = 10 [ default = false ];
optional bool auto = 11 [ default = false ];
optional bool async = 12 [ default = true ];
optional bool a_sync = 12 [ default = true ];
optional bool sync_nccl_allreduce = 13 [ default = true ];
optional int32 nccl_comm_num = 14 [ default = 1 ];
optional bool use_hierarchical_allreduce = 15 [ default = false ];
......@@ -99,7 +99,7 @@ message DistributedStrategy {
optional LocalSGDConfig localsgd_configs = 103;
optional GradientMergeConfig gradient_merge_configs = 104;
optional PipelineConfig pipeline_configs = 106;
optional AsyncConfig async_configs = 107;
optional AsyncConfig a_sync_configs = 107;
optional BuildStrategy build_strategy = 201;
optional ExecutionStrategy execution_strategy = 202;
......
......@@ -201,7 +201,7 @@ class DistributedStrategy(object):
f.name).extend(getattr(strategy, f.name))
@property
def async_update(self):
def a_sync(self):
"""
Indicating whether we are using asynchronous stocastic gradient descent updates
for training. This property is valid when we are using parameter server training,
......@@ -216,29 +216,29 @@ class DistributedStrategy(object):
fleet.init(role_maker)
strategy = fleet.DistributedStrategy()
strategy.async_update = True # by default this is True
strategy.a_sync = True # by default this is True
# code block for defining loss and local optimizer
# sgd = fleet.distributed_optimizer(optimizer, strategy)
"""
return self.strategy.async
return self.strategy.a_sync
@async_update.setter
def async_update(self, flag):
@a_sync.setter
def a_sync(self, flag):
if isinstance(flag, bool):
self.strategy.async = flag
self.strategy.a_sync = flag
else:
print("WARNING: async_update should have value of bool type")
print("WARNING: a_sync should have value of bool type")
@property
def async_update_configs(self):
def a_sync_configs(self):
"""
Set async update configurations. In general, asynchronous parameter server
Set a_sync update configurations. In general, asynchronous parameter server
training has serveral configurable settings that can be configured through
a dict.
**Notes**:
**Detailed arguments for async_update_configs**
**Detailed arguments for a_sync_configs**
**k_step**: number of local optimization updates before communication
**max_merge_var_num**: maximum number of merged gradients before communication
**send_queue_size**: a buffer size of worker communication
......@@ -255,19 +255,20 @@ class DistributedStrategy(object):
fleet.init(role_maker)
strategy = fleet.DistributedStrategy()
strategy.async_update = True # by default this is True
strategy.a_sync = True # by default this is True
configs = {"k_step": 10000, "send_queue_size": 32}
strategy.async_update_configs = configs
strategy.a_sync_configs = configs
# code block for defining loss and local optimizer
# sgd = fleet.distributed_optimizer(optimizer, strategy)
"""
return get_msg_dict(self.strategy.async_configs)
return get_msg_dict(self.strategy.a_sync_configs)
@async_update_configs.setter
def async_update_configs(self, configs):
check_configs_key(self.strategy.async_configs, configs, "async_configs")
assign_configs_value(self.strategy.async_configs, configs)
@a_sync_configs.setter
def a_sync_configs(self, configs):
check_configs_key(self.strategy.a_sync_configs, configs,
"a_sync_configs")
assign_configs_value(self.strategy.a_sync_configs, configs)
@property
def amp(self):
......@@ -584,4 +585,7 @@ class DistributedStrategy(object):
print("WARNING: auto should have value of bool type")
def __repr__(self):
fields = self.strategy.DESCRIPTOR.fields
for f in fields:
print("{}: {}".format(f.name, f.default_value))
return str(self.strategy)
......@@ -228,6 +228,7 @@ class Fleet(object):
"""
self.user_defined_optimizer = optimizer
self.user_defined_strategy = strategy
self.valid_strategy = None
return self
def minimize(self,
......@@ -292,8 +293,10 @@ class Fleet(object):
distributed_optimizer_list = \
MetaOptimizerFactory()._get_valid_meta_optimizers(
self.user_defined_optimizer)
valid_optimizer_list = []
valid_graph_optimizer_list = []
can_not_apply_optimizer_list = []
# recall meta optimizers for ranking
for opt in distributed_optimizer_list:
opt._set_basic_info(loss, self._role_maker,
......@@ -301,15 +304,21 @@ class Fleet(object):
self.user_defined_strategy)
if opt._can_apply() and not opt._is_graph_out():
valid_optimizer_list.append(opt)
if opt._can_apply() and opt._is_graph_out():
elif opt._can_apply() and opt._is_graph_out():
valid_graph_optimizer_list.append(opt)
else:
can_not_apply_optimizer_list.append(opt)
# combine recalled meta optimizers to be a valid meta optimizer
meta_optimizer, graph_optimizer, final_dist_strategy = \
meta_optimizer, graph_optimizer = \
self.strategy_compiler.generate_optimizer(
loss, self._role_maker, self.user_defined_optimizer,
self.user_defined_strategy, valid_optimizer_list,
valid_graph_optimizer_list)
valid_strategy = self.strategy_compiler._get_valid_strategy(
self.user_defined_strategy, can_not_apply_optimizer_list)
self.valid_strategy = valid_strategy
optimize_ops = []
params_grads = []
if meta_optimizer:
......@@ -332,12 +341,10 @@ class Fleet(object):
if self._runtime_handle is None:
self._runtime_handle = RuntimeFactory()._create_runtime(
final_dist_strategy, self._role_maker, optimize_ops,
params_grads)
valid_strategy, self._role_maker, optimize_ops, params_grads)
if self._util is None:
self._util = UtilFactory()._create_util(final_dist_strategy,
self._role_maker,
optimize_ops, params_grads)
self._util = UtilFactory()._create_util(
valid_strategy, self._role_maker, optimize_ops, params_grads)
return optimize_ops, params_grads
......@@ -30,7 +30,7 @@ def maximum_path_len_algo(optimizer_list):
return None
for idx, opt in enumerate(candidates[max_idx][:-1]):
opt._update_inner_optimizer(candidates[max_idx][idx + 1])
return candidates[max_idx][0]
return candidates[max_idx]
class StrategyCompilerBase(object):
......@@ -51,19 +51,55 @@ class StrategyCompiler(StrategyCompilerBase):
def __init__(self):
super(StrategyCompiler, self).__init__()
self._meta_optimizer = None
self._graph_optimizer = None
self._valid_optimizer_list = None
self._user_defined_strategy = None
self._meta_optimizer_candidates = []
self._graph_optimizer_candidates = []
def _get_valid_strategy(self, dist_strategy, can_not_apply_optimizer_list):
import copy
valid_strategy = copy.copy(dist_strategy)
invalid_optimizers = []
for candidate in self._meta_optimizer_candidates:
is_valid = False
for valid in self._meta_optimizers:
if candidate.__class__.__name__ == valid.__class__.__name__:
is_valid = True
break
if not is_valid:
invalid_optimizers.append(candidate)
for opt in invalid_optimizers:
opt._disable_strategy(valid_strategy)
for opt in can_not_apply_optimizer_list:
opt._disable_strategy(valid_strategy)
return valid_strategy
def generate_optimizer(self, loss, role_maker, optimizer,
userd_defined_strategy, meta_optimizer_list,
user_defined_strategy, meta_optimizer_list,
graph_optimizer_list):
self._user_defined_strategy = user_defined_strategy
self._meta_optimizer_candidates = meta_optimizer_list
self._graph_optimizer_candidates = graph_optimizer_list
if len(meta_optimizer_list) == 0 and len(graph_optimizer_list) == 0:
return optimizer, None
else:
# currently, we use heuristic algorithm to select
# meta optimizers combinations
meta_optimizer = maximum_path_len_algo(meta_optimizer_list)
graph_optimizer = maximum_path_len_algo(graph_optimizer_list)
meta_optimizers = maximum_path_len_algo(meta_optimizer_list)
graph_optimizers = maximum_path_len_algo(graph_optimizer_list)
# should design a distributed strategy update interface
# when we have finally decided the combination of meta_optimizer
# and graph_optimizer, the corresponding distributed strategy
# should be updated.
return meta_optimizer, graph_optimizer, None
self._meta_optimizers = meta_optimizers
self._graph_optimizers = graph_optimizers
return_meta = None if meta_optimizers == None else meta_optimizers[
0]
return_graph = None if graph_optimizers == None else graph_optimizers[
0]
return return_meta, return_graph
......@@ -16,6 +16,7 @@ from paddle.fluid.framework import core
from paddle.fluid import compiler
from .meta_optimizer_base import MetaOptimizerBase
from ..base.private_helper_function import wait_server_ready
import logging
class GraphExecutionOptimizer(MetaOptimizerBase):
......@@ -32,6 +33,10 @@ class GraphExecutionOptimizer(MetaOptimizerBase):
"""
Basically, this is PE, and almost all programs can be executed here
"""
if not self.role_maker._is_collective:
# update me. currently, if parameter server is used
# graph execution optimizer can not be applied
return False
return True
def backward(self,
......@@ -178,6 +183,10 @@ class GraphExecutionOptimizer(MetaOptimizerBase):
return self._compiled_program
def _disable_strategy(self, dist_strategy):
# TODO(guru4elephant): should close all PE related flags here
pass
def minimize(self,
loss,
startup_program=None,
......
......@@ -39,6 +39,9 @@ class MetaOptimizerBase(object):
if str(optimizer.__class__.__name__) in self.meta_optimizers_white_list:
return True
def _disable_strategy(self, dist_strategy):
raise NotImplementedError("you should implement disable strategy")
def minimize_impl(self,
loss,
startup_program=None,
......
......@@ -34,11 +34,16 @@ class RecomputeOptimizer(MetaOptimizerBase):
def _can_apply(self):
if self.user_defined_strategy.recompute == True:
if len(self.user_defined_strategy.recompute_checkpoints) == 0:
if len(self.user_defined_strategy.recompute_configs[
"checkpoints"]) == 0:
return False
else:
return True
def _disable_strategy(self, dist_strategy):
dist_strategy.recompute = False
dist_strategy.recompute_configs = {"checkpoints": []}
def backward(self,
loss,
startup_program=None,
......
......@@ -178,20 +178,20 @@ class TestStrategyConfig(unittest.TestCase):
strategy.lamb = "True"
self.assertEqual(strategy.lamb, False)
def test_async_update(self):
def test_a_sync(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.async_update = True
self.assertEqual(strategy.async_update, True)
strategy.async_update = False
self.assertEqual(strategy.async_update, False)
strategy.async_update = "True"
self.assertEqual(strategy.async_update, False)
def test_async_configs(self):
strategy.a_sync = True
self.assertEqual(strategy.a_sync, True)
strategy.a_sync = False
self.assertEqual(strategy.a_sync, False)
strategy.a_sync = "True"
self.assertEqual(strategy.a_sync, False)
def test_a_sync_configs(self):
strategy = paddle.fleet.DistributedStrategy()
configs = {"k_steps": 1000}
strategy.async_update_configs = configs
self.assertEqual(strategy.async_update_configs["k_steps"], 1000)
strategy.a_sync_configs = configs
self.assertEqual(strategy.a_sync_configs["k_steps"], 1000)
def test_elastic(self):
strategy = paddle.fleet.DistributedStrategy()
......@@ -213,7 +213,7 @@ class TestStrategyConfig(unittest.TestCase):
def test_strategy_prototxt(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.async_update = True
strategy.a_sync = True
strategy.localsgd = True
strategy.dgc = True
localsgd_configs = {"k_steps": 5}
......
......@@ -25,6 +25,27 @@ class TestFleetMetaOptimizer(unittest.TestCase):
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_graph_execution_optimizer_not_apply(self):
import paddle.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
def test_graph_execution_optimizer(self):
import paddle.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
......@@ -42,7 +63,7 @@ class TestFleetMetaOptimizer(unittest.TestCase):
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
strategy.nccl_comm_num = 2
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
......@@ -65,7 +86,7 @@ class TestFleetMetaOptimizer(unittest.TestCase):
strategy = paddle.fleet.DistributedStrategy()
strategy.recompute = True
strategy.recompute_checkpoints = [fc_2]
strategy.recompute_configs = {"checkpoints": ["fc2"]}
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册