未验证 提交 66596bd2 编写于 作者: L liuyuhui 提交者: GitHub

【paddle.fleet】solve the initial configuration about fleet and rolemaker (#26368)

* solve the initial configuration about fleet and rolemaker
Co-authored-by: NseiriosPlus <tangwei12@baidu.com>
上级 faa9b97b
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import paddle import paddle
from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase
from .strategy_compiler import StrategyCompiler from .strategy_compiler import StrategyCompiler
from .distributed_strategy import DistributedStrategy from .distributed_strategy import DistributedStrategy
from .meta_optimizer_factory import MetaOptimizerFactory from .meta_optimizer_factory import MetaOptimizerFactory
...@@ -74,9 +75,33 @@ class Fleet(object): ...@@ -74,9 +75,33 @@ class Fleet(object):
def __init__(self): def __init__(self):
self._runtime_handle = None self._runtime_handle = None
self._util = None self._util = None
self._role_maker = None
self._is_collective = False
def init(self, role_maker): def init(self, role_maker=None, is_collective=False):
self._role_maker = role_maker """
Initialize role_maker in Fleet.
This function is responsible for the distributed architecture
what you want to run your code behind,such as Transpiler,
Collective in PaddleCloudRoleMaker or UserDefinedRoleMaker
"""
if isinstance(role_maker, RoleMakerBase):
self._role_maker = role_maker
elif role_maker == None:
if isinstance(is_collective, bool):
self._is_collective = is_collective
self._role_maker = PaddleCloudRoleMaker(
is_collective=self._is_collective)
else:
raise ValueError(
"Something wrong occurred, please check whether is_collective is bool value"
)
else:
raise ValueError(
"Something wrong occurred, please check whether rolemaker is instance of RoleMakerBase"
)
self.strategy_compiler = StrategyCompiler() self.strategy_compiler = StrategyCompiler()
return None return None
......
...@@ -165,10 +165,10 @@ class RoleMakerBase(object): ...@@ -165,10 +165,10 @@ class RoleMakerBase(object):
class PaddleCloudRoleMaker(RoleMakerBase): class PaddleCloudRoleMaker(RoleMakerBase):
def __init__(self, is_collective=False, init_gloo=True, **kwargs): def __init__(self, is_collective=False, **kwargs):
super(PaddleCloudRoleMaker, self).__init__() super(PaddleCloudRoleMaker, self).__init__()
self._is_collective = is_collective self._is_collective = is_collective
self._init_gloo = init_gloo self._init_gloo = False #default no init gloo
self._kwargs = kwargs self._kwargs = kwargs
self._role_is_generated = False self._role_is_generated = False
...@@ -204,30 +204,35 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -204,30 +204,35 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._prefix = os.getenv("SYS_JOB_ID", "") self._prefix = os.getenv("SYS_JOB_ID", "")
def _barrier(self, comm_world): def _barrier(self, comm_world):
if comm_world: if isinstance(comm_world, fluid.core.Gloo):
comm_world.barrier() comm_world.barrier()
else:
print("warning: must init Gloo before using _barrier() function")
def _all_gather(self, comm_world, input): def _all_gather(self, comm_world, input):
if comm_world: if isinstance(comm_world, fluid.core.Gloo):
self._barrier(comm_world) self._barrier(comm_world)
output = comm_world.all_gather(input) output = comm_world.all_gather(input)
return output return output
else: else:
print("warning: must init Gloo before using _all_gather() function")
return None return None
def _all_reduce(self, comm_world, input, mode="sum"): def _all_reduce(self, comm_world, input, mode="sum"):
if not comm_world: if isinstance(comm_world, fluid.core.Gloo):
return None
input = np.array(input) input = np.array(input)
input_shape = input.shape input_shape = input.shape
input_list = input.reshape(-1).tolist() input_list = input.reshape(-1).tolist()
self._barrier(comm_world) self._barrier(comm_world)
ans = comm_world.all_reduce(input_list, mode) ans = comm_world.all_reduce(input_list, mode)
output = np.array(ans).reshape(input_shape) output = np.array(ans).reshape(input_shape)
return output return output
else:
print("warning: must init Gloo before using _all_reduce() function")
return None
def is_worker(self): def is_worker(self):
""" """
...@@ -460,6 +465,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -460,6 +465,8 @@ class PaddleCloudRoleMaker(RoleMakerBase):
if not self._role_is_generated: if not self._role_is_generated:
if not self._is_collective: if not self._is_collective:
self._ps_env() self._ps_env()
if "PADDLE_WITH_GLOO" in os.environ:
self._init_gloo = bool(os.environ["PADDLE_WITH_GLOO"])
if self._init_gloo: if self._init_gloo:
self._init_gloo_env() self._init_gloo_env()
else: else:
......
...@@ -25,7 +25,7 @@ import numpy ...@@ -25,7 +25,7 @@ import numpy
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
......
...@@ -21,7 +21,7 @@ import os ...@@ -21,7 +21,7 @@ import os
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
......
...@@ -17,7 +17,7 @@ import time ...@@ -17,7 +17,7 @@ import time
import unittest import unittest
import paddle import paddle
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import unittest import unittest
import paddle import paddle
import os import os
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import time import time
......
...@@ -16,7 +16,7 @@ import unittest ...@@ -16,7 +16,7 @@ import unittest
import paddle import paddle
import os import os
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import time import time
......
...@@ -168,6 +168,8 @@ class TestDistGloo_2x2(TestFleetBase): ...@@ -168,6 +168,8 @@ class TestDistGloo_2x2(TestFleetBase):
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36013,127.0.0.1:36014", "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36013,127.0.0.1:36014",
"PADDLE_TRAINERS_NUM": "2", "PADDLE_TRAINERS_NUM": "2",
"PADDLE_PSERVER_ID": "0", "PADDLE_PSERVER_ID": "0",
#GLOO FLAG
"PADDLE_WITH_GLOO": "1",
} }
required_envs.update(need_envs) required_envs.update(need_envs)
......
...@@ -21,7 +21,7 @@ import shutil ...@@ -21,7 +21,7 @@ import shutil
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
# For Net # For Net
......
...@@ -29,7 +29,6 @@ class TestFleetBase(unittest.TestCase): ...@@ -29,7 +29,6 @@ class TestFleetBase(unittest.TestCase):
def test_ps_minimize(self): def test_ps_minimize(self):
import paddle import paddle
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
os.environ["TRAINING_ROLE"] = "PSERVER" os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1" os.environ["POD_IP"] = "127.0.0.1"
...@@ -46,7 +45,7 @@ class TestFleetBase(unittest.TestCase): ...@@ -46,7 +45,7 @@ class TestFleetBase(unittest.TestCase):
input=prediction, label=input_y) input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost) avg_cost = paddle.fluid.layers.mean(x=cost)
role = role_maker.PaddleCloudRoleMaker(is_collective=False) role = fleet.PaddleCloudRoleMaker(is_collective=False)
fleet.init(role) fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False strategy.a_sync = False
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
import paddle.fluid as fluid
class TestFleetBase(unittest.TestCase):
def setUp(self):
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_fleet_init(self):
import paddle.distributed.fleet as fleet
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
role = fleet.PaddleCloudRoleMaker(is_collective=False)
fleet.init(role)
fleet.init()
fleet.init(is_collective=False)
self.assertRaises(Exception, fleet.init, is_collective="F")
self.assertRaises(Exception, fleet.init, role_maker="F")
if __name__ == "__main__":
unittest.main()
...@@ -17,7 +17,7 @@ import paddle ...@@ -17,7 +17,7 @@ import paddle
from paddle import fluid from paddle import fluid
import os import os
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
from paddle.distributed.fleet.meta_optimizers.meta_optimizer_base import MetaOptimizerBase from paddle.distributed.fleet.meta_optimizers.meta_optimizer_base import MetaOptimizerBase
......
...@@ -56,7 +56,7 @@ class TestFleetUtil(unittest.TestCase): ...@@ -56,7 +56,7 @@ class TestFleetUtil(unittest.TestCase):
def test_get_util(self): def test_get_util(self):
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
default_util = fleet.util default_util = fleet.util
...@@ -72,7 +72,7 @@ class TestFleetUtil(unittest.TestCase): ...@@ -72,7 +72,7 @@ class TestFleetUtil(unittest.TestCase):
def get_user_id(self): def get_user_id(self):
return 10 return 10
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
my_util = UserDefinedUtil() my_util = UserDefinedUtil()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册