未验证 提交 60c3ef3a 编写于 作者: 1 123malin 提交者: GitHub

【paddle.fleet】parameter_server_optimizer support auto_strategy (#27181)

* parameter_server_optimizer support auto_strategy
上级 fde5cfe8
......@@ -1019,7 +1019,7 @@ class Fleet(object):
if self.user_defined_strategy._is_strict_auto():
# turn on all the strategy for each optimizer
for opt in distributed_optimizer_list:
opt._enable_strategy(self.user_defined_strategy)
opt._enable_strategy(self.user_defined_strategy, context)
valid_optimizer_list = []
valid_graph_optimizer_list = []
......
......@@ -34,6 +34,9 @@ class AMPOptimizer(MetaOptimizerBase):
loss, role_maker, user_defined_optimizer, user_defined_strategy)
def _can_apply(self):
if not self.role_maker._is_collective:
return False
if self.user_defined_strategy.amp:
return True
return False
......@@ -42,7 +45,7 @@ class AMPOptimizer(MetaOptimizerBase):
dist_strategy.amp = False
dist_strategy.amp_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
dist_strategy.amp = True
dist_strategy.amp_configs = {
"init_loss_scaling": 32768.0,
......
......@@ -53,6 +53,9 @@ class DGCOptimizer(MetaOptimizerBase):
name=opt._name)
def _can_apply(self):
if not self.role_maker._is_collective:
return False
if self.user_defined_strategy.dgc:
if not isinstance(self.inner_opt, Momentum):
logging.warn("dgc only works on Momentum optimizer")
......@@ -69,7 +72,7 @@ class DGCOptimizer(MetaOptimizerBase):
dist_strategy.dgc = False
dist_strategy.dgc_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
dist_strategy.dgc = True
dist_strategy.dgc_configs = {"rampup_begin_step": 0, "rampup_step": 1}
......
......@@ -37,6 +37,9 @@ class GradientMergeOptimizer(MetaOptimizerBase):
self.user_defined_strategy.gradient_merge_configs["avg"])
def _can_apply(self):
if not self.role_maker._is_collective:
return False
can_apply = (self.user_defined_strategy.gradient_merge == True) and \
self.user_defined_strategy.gradient_merge_configs["k_steps"] > 1
return can_apply
......@@ -45,7 +48,7 @@ class GradientMergeOptimizer(MetaOptimizerBase):
dist_strategy.gradient_merge = False
dist_strategy.gradient_merge_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
# we currently do not support auto-enable gradient merge
return
......
......@@ -190,7 +190,7 @@ class GraphExecutionOptimizer(MetaOptimizerBase):
# TODO(guru4elephant): should close all PE related flags here
return
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
# by default, graph execution strategy is enabled
return
......
......@@ -62,6 +62,9 @@ class LambOptimizer(MetaOptimizerBase):
name=opt._name)
def _can_apply(self):
if not self.role_maker._is_collective:
return False
if self.user_defined_strategy.lamb:
if not isinstance(self.inner_opt, AdamOptimizer):
logging.warn(
......@@ -75,7 +78,7 @@ class LambOptimizer(MetaOptimizerBase):
dist_strategy.lamb = False
dist_strategy.lamb_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
dist_strategy.lamb = True
dist_strategy.lamb_configs = {
"lamb_weight_decay": 0.01,
......
......@@ -49,6 +49,9 @@ class LarsOptimizer(MetaOptimizerBase):
epsilon=configs['epsilon'])
def _can_apply(self):
if not self.role_maker._is_collective:
return False
if self.user_defined_strategy.lars:
if not isinstance(self.inner_opt, Momentum):
logging.warn(
......@@ -62,7 +65,7 @@ class LarsOptimizer(MetaOptimizerBase):
dist_strategy.lars = False
dist_strategy.lars_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
dist_strategy.lars = True
dist_strategy.lars_configs = {
"lars_coeff": 0.01,
......
......@@ -29,6 +29,9 @@ class LocalSGDOptimizer(MetaOptimizerBase):
self.snapshot_key = '@SNAPSHOT'
def _can_apply(self):
if not self.role_maker._is_collective:
return False
if not self.user_defined_strategy.localsgd:
return False
......@@ -44,7 +47,7 @@ class LocalSGDOptimizer(MetaOptimizerBase):
dist_strategy.localsgd = False
dist_strategy.localsgd_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
dist_strategy.localsgd = True
dist_strategy.localsgd_configs = {"k_steps": 1}
......
......@@ -48,7 +48,7 @@ class MetaOptimizerBase(Optimizer):
raise NotImplementedError("you should implement disable strategy in {}".
format(type(self).__name__))
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context=None):
raise NotImplementedError("you should implement enable strategy in {}".
format(type(self).__name__))
......
......@@ -24,6 +24,9 @@ class ParameterServerGraphOptimizer(ParameterServerOptimizer):
self.meta_optimizers_white_list = []
def _can_apply(self):
if self.role_maker._is_collective:
return False
k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
if k_steps < 0:
return False
......@@ -37,12 +40,11 @@ class ParameterServerGraphOptimizer(ParameterServerOptimizer):
return True
def _disable_strategy(self, dist_strategy):
dist_strategy.a_sync_configs = {}
return
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
# only open up the async mode for auto-parallel
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {}
return
def _is_graph_out(self):
return True
......
......@@ -32,8 +32,6 @@ class ParameterServerOptimizer(MetaOptimizerBase):
def _can_apply(self):
if self.role_maker._is_collective:
return False
if self.user_defined_strategy.auto == True:
return True
k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
return True if k_steps >= 0 else False
......@@ -134,7 +132,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
return _main, _startup
def _try_auto_apply_geo(self, program, compiled_config):
def _can_apply_geo(self, dist_strategy, program):
def get_sys_free_mem():
plat = platform.system()
if platform.system() == "Darwin":
......@@ -163,36 +161,28 @@ class ParameterServerOptimizer(MetaOptimizerBase):
"%s platform is unsupported is parameter server optimizer" %
(platform.system()))
if self.user_defined_strategy.auto == False:
return
a_sync_configs = self.user_defined_strategy.a_sync_configs
if a_sync_configs["k_steps"] >= 0:
return
self.user_defined_strategy.a_sync = True
if not isinstance(self.inner_opt, fluid.optimizer.SGDOptimizer):
# auto async
a_sync_configs["k_steps"] = 0
self.user_defined_strategy.a_sync_configs = a_sync_configs
return
return False
from paddle.fluid.incubate.fleet.parameter_server.ir.vars_metatools import dtype_to_size
free = get_sys_free_mem()
param_grad_pairs = compiled_config.origin_sparse_pairs + compiled_config.origin_dense_pairs
processed_var_names = set(["@EMPTY@"])
from paddle.fluid.incubate.fleet.parameter_server.ir import vars_metatools
processed_var_names = set(["@EMPTY@"])
param_memory_size = 0
for param_grad_pair in param_grad_pairs:
param, grad = param_grad_pair
for varname in program.global_block().vars:
var = program.global_block().vars[varname]
if not var.persistable or var.desc.type(
) != core.VarDesc.VarType.LOD_TENSOR:
continue
param = vars_metatools.create_var_struct(var)
param_memory_size += param.m_size
processed_var_names.add(param.name)
processed_var_names.add(varname)
upper_mem_use = param_memory_size * 5.0
program_tmp_vars = dict()
batch_size = 1024
eval_batch_size = 1024
for op in program.global_block().ops:
for var_name in op.output_arg_names:
if var_name in processed_var_names:
......@@ -215,23 +205,21 @@ class ParameterServerOptimizer(MetaOptimizerBase):
data_count *= (-x)
else:
data_count *= x
program_tmp_vars[var_name] = (data_count, neg_dim_count,
dtype_to_size[var.dtype])
program_tmp_vars[var_name] = (
data_count, neg_dim_count,
vars_metatools.dtype_to_size[var.dtype])
for varname in program_tmp_vars:
data_count, neg_dim_count, type_size = program_tmp_vars[varname]
if neg_dim_count == 1:
data_count *= batch_size
data_count *= eval_batch_size
var_memory = data_count * type_size
upper_mem_use += var_memory
if upper_mem_use < free:
# auto geo
a_sync_configs["k_steps"] = 800
return True
else:
# auto async
a_sync_configs["k_steps"] = 0
self.user_defined_strategy.a_sync_configs = a_sync_configs
return False
def minimize_impl(self,
loss,
......@@ -240,6 +228,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
no_grad_set=None):
self.inner_opt.minimize(loss, startup_program, parameter_list,
no_grad_set)
strategy = self._get_distributed_strategy()
_origin_main_program = loss.block.program
_origin_startup_program = startup_program
......@@ -247,11 +236,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
compiled_config = public.CompileTimeStrategy(_origin_main_program,
_origin_startup_program,
None, self.role_maker)
self._try_auto_apply_geo(_origin_main_program, compiled_config)
strategy = self._get_distributed_strategy()
strategy, self.role_maker)
compiled_config.strategy = strategy
if self.role_maker.is_worker() or self.role_maker._is_heter_worker():
......@@ -267,9 +252,24 @@ class ParameterServerOptimizer(MetaOptimizerBase):
return None, None
def _disable_strategy(self, dist_strategy):
dist_strategy.a_sync_configs = {}
self.user_defined_strategy.a_sync_configs = {}
dist_strategy.a_sync = False
a_sync_configs = dist_strategy.a_sync_configs
a_sync_configs["k_steps"] = -1
dist_strategy.a_sync_configs = a_sync_configs
def _enable_strategy(self, dist_strategy, context):
a_sync_configs = dist_strategy.a_sync_configs
if a_sync_configs["k_steps"] >= 0:
return
def _enable_strategy(self, dist_strategy):
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {}
a_sync_configs = dist_strategy.a_sync_configs
is_geo = self._can_apply_geo(dist_strategy,
context["origin_main_program"])
if is_geo:
a_sync_configs["k_steps"] = 800
else:
a_sync_configs["k_steps"] = 0
dist_strategy.a_sync_configs = a_sync_configs
......@@ -103,6 +103,9 @@ class PipelineOptimizer(MetaOptimizerBase):
self.wrapped_opt = PO(self.inner_opt, num_microbatches=num_microbatches)
def _can_apply(self):
if not self.role_maker._is_collective:
return False
if self.user_defined_strategy.pipeline == True:
return True
return False
......@@ -111,7 +114,7 @@ class PipelineOptimizer(MetaOptimizerBase):
dist_strategy.pipeline = False
dist_strategy.pipeline_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
# we do not support enable pipeline automatically right now
return
......
......@@ -38,6 +38,9 @@ class RecomputeOptimizer(MetaOptimizerBase):
list(user_defined_strategy.recompute_configs["checkpoints"]))
def _can_apply(self):
if self.role_maker._is_collective:
return False
if self.user_defined_strategy.recompute == True:
if len(self.user_defined_strategy.recompute_configs[
"checkpoints"]) == 0:
......@@ -49,7 +52,7 @@ class RecomputeOptimizer(MetaOptimizerBase):
dist_strategy.recompute = False
dist_strategy.recompute_configs = {}
def _enable_strategy(self, dist_strategy):
def _enable_strategy(self, dist_strategy, context):
# we do not support automatically recompute checkpoints currently
return
......
......@@ -441,8 +441,6 @@ if(WITH_DISTRIBUTE)
# FIXME(seiriosX) will fix this
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_sparse_embedding_ctr")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_gloo")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_a_sync_optimizer_auto")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ctr")
py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS})
py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS})
......
......@@ -62,82 +62,6 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
a_sync_configs = optimizer.user_defined_strategy.a_sync_configs
self.assertTrue(a_sync_configs['k_steps'] == 0)
def test_a_sync_optimizer2(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
import paddle.distributed.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
self.assertTrue(optimizer.user_defined_strategy.a_sync)
a_sync_configs = optimizer.user_defined_strategy.a_sync_configs
self.assertTrue(a_sync_configs['k_steps'] == 800)
def test_a_sync_optimizer3(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
import paddle.distributed.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x",
shape=[-1, 1],
dtype="int64",
lod_level=1,
append_batch_size=False)
x_embedding = paddle.fluid.layers.embedding(
is_distributed=False,
input=input_x,
size=[1000000000, 100000],
param_attr=paddle.fluid.ParamAttr(
name="embedding",
initializer=paddle.fluid.initializer.Constant(value=0.01)),
is_sparse=True)
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=x_embedding, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
self.assertTrue(optimizer.user_defined_strategy.a_sync)
a_sync_configs = optimizer.user_defined_strategy.a_sync_configs
self.assertTrue(a_sync_configs['k_steps'] == 0)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
import paddle.distributed.fleet.base.role_maker as role_maker
import time
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_a_sync_optimizer3(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
import paddle.distributed.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x",
shape=[-1, 1],
dtype="int64",
lod_level=1,
append_batch_size=False)
x_embedding = paddle.fluid.layers.embedding(
is_distributed=False,
input=input_x,
size=[1000000000, 100000],
param_attr=paddle.fluid.ParamAttr(
name="embedding",
initializer=paddle.fluid.initializer.Constant(value=0.01)),
is_sparse=True)
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=x_embedding, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
self.assertTrue(optimizer.user_defined_strategy.a_sync)
a_sync_configs = optimizer.user_defined_strategy.a_sync_configs
self.assertTrue(a_sync_configs['k_steps'] == 0)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
import paddle.distributed.fleet.base.role_maker as role_maker
import time
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_a_sync_optimizer2(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
import paddle.distributed.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
self.assertTrue(optimizer.user_defined_strategy.a_sync)
a_sync_configs = optimizer.user_defined_strategy.a_sync_configs
self.assertTrue(a_sync_configs['k_steps'] == 800)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册