未验证 提交 99626502 编写于 作者: T tangwei12 提交者: GitHub

【paddle.fleet】gloo and util (#27213)

* fix worker endpoints

* fix gloo wrapper for hdfs

* GPU fleetrun support gloo

* parameterserver fleetrun support gloo

* fix get server endpoint
上级 a5ef246c
......@@ -19,6 +19,8 @@ limitations under the License. */
namespace gloo {
namespace rendezvous {
constexpr int kNodeSize = 136;
HdfsStore::HdfsStore(const std::string& path) {
path_ = path;
wait_sleep_ms_ = 10000;
......@@ -213,12 +215,14 @@ void ParallelConnectContext::connectFullMesh(
storeKey << rank;
store.set(storeKey.str(), allBytes);
auto total_add_size = kNodeSize * (size - 1);
std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_);
// Connect every pair
for (uint32_t i = 0; i < connect_threads.size(); ++i) {
connect_threads[i].reset(new std::thread(
[&store, &transportContext, this](size_t thread_idx,
size_t thread_num) -> void {
[&store, &transportContext, total_add_size, this](
size_t thread_idx, size_t thread_num) -> void {
for (int i = thread_idx; i < size; i += thread_num) {
if (i == rank) {
continue;
......@@ -226,8 +230,23 @@ void ParallelConnectContext::connectFullMesh(
// Wait for address of other side of this pair to become available
std::string key = std::to_string(i);
store.wait({key}, getTimeout());
std::vector<char> allAddrs;
auto max_retry_times = 5;
// Connect to other side of this pair
auto allAddrs = store.get(key);
while (max_retry_times > 0) {
allAddrs = store.get(key);
VLOG(3) << "store get all address size: " << allAddrs.size()
<< " except: " << total_add_size;
if (allAddrs.size() == static_cast<size_t>(total_add_size)) {
break;
}
--max_retry_times;
}
auto addr = extractAddress(allAddrs, i);
transportContext->getPair(i)->connect(addr);
}
......
......@@ -39,6 +39,7 @@ server_num = fleet.server_num
server_index = fleet.server_index
server_endpoints = fleet.server_endpoints
is_server = fleet.is_server
set_util = fleet.set_util
util = fleet.util
barrier_worker = fleet.barrier_worker
init_worker = fleet.init_worker
......
......@@ -180,6 +180,8 @@ class Fleet(object):
raise ValueError(
"`role_maker` should be subclass of `RoleMakerBase`, but got {}".
format(type(role_maker)))
self._role_maker.generate_role()
self.strategy_compiler = StrategyCompiler()
if paddle.fluid.framework.in_dygraph_mode():
if parallel_helper._is_parallel_ctx_initialized():
......@@ -187,7 +189,6 @@ class Fleet(object):
"The dygraph parallel environment has been initialized.")
else:
paddle.distributed.init_parallel_env()
return None
def is_first_worker(self):
"""
......@@ -275,13 +276,10 @@ class Fleet(object):
fleet.worker_endpoints()
"""
'''
if to_string:
return ",".join(self._role_maker.get_trainer_endpoints())
else:
return self._role_maker.get_trainer_endpoints()
'''
return ["127.0.0.1:1001", "127.0.0.1:1002"]
def server_num(self):
"""
......@@ -355,7 +353,9 @@ class Fleet(object):
return self._role_maker.is_server(
) or self._role_maker._is_heter_worker()
@property
def set_util(self, util):
self._util = util
def util(self):
"""
Utility functions that can be used under certain runtime
......@@ -376,16 +376,6 @@ class Fleet(object):
"""
return self._util
@util.setter
def util(self, util):
"""
Set Utility functions for userd-defined runtime
Returns:
None
"""
self._util = util
def barrier_worker(self):
"""
barrier all workers
......@@ -393,7 +383,7 @@ class Fleet(object):
Returns:
None
"""
self._role_maker.barrier_worker()
self._role_maker._barrier("worker")
@is_non_distributed_check
@inited_runtime_handler
......
......@@ -13,18 +13,332 @@
# limitations under the License.
"""Defination of Role Makers."""
import os
import time
import numpy as np
import warnings
from multiprocessing import Process, Manager
import paddle.fluid as fluid
#__all__ = ['UserDefinedRoleMaker', 'PaddleCloudRoleMaker']
import paddle.fluid as fluid
class Role:
WORKER = 1
SERVER = 2
HETER_WORKER = 3
ALL = 4
class Gloo(object):
"""
Gloo is a universal class for barrier and collective communication
"""
class RENDEZVOUS:
HDFS = 1
FILE = 2
HTTP = 3
def __init__(self):
self._worker_comm = None
self._server_comm = None
self._nodes_comm = None
self._comm_world = ["worker", "server", "all"]
self._err_init = "gloo is not initialized, will not communicator with other nodes"
self._err_type = "gloo initialized error, please check arguments"
self._err_world = "argument error, comm_world must in {}".format(
self._comm_world)
self._is_initialized = False
self._init_timeout_seconds = 3600
self._run_timeout_seconds = 9999999
self._rendezvous = None
self._role = None
self._iface = None
self._role_id = -1
self._worker_num = -1
self._server_num = -1
self._need_init_all = False
def init(self,
rendezvous,
role,
role_id,
worker_num,
server_num,
need_init_all=False,
kwargs=None):
self._rendezvous = rendezvous
self._role = role
self._role_id = role_id
self._worker_num = worker_num
self._server_num = server_num
self._need_init_all = need_init_all
self._iface = self.__get_default_iface()
self._prefix = kwargs.get("store.prefix", "")
if self._rendezvous == Gloo.RENDEZVOUS.HDFS:
dfs_name = kwargs.get("dfs.name", "")
dfs_ugi = kwargs.get("dfs.ugi", "")
dfs_path = kwargs.get("dfs.path", "")
if not dfs_name or not dfs_ugi or not dfs_path:
raise ValueError(self._err_type)
self._init_dfs(dfs_name, dfs_ugi, dfs_path, self._prefix)
elif self._rendezvous == Gloo.RENDEZVOUS.FILE:
fs_path = kwargs.get("dfs.path", "")
if not fs_path:
raise ValueError(self._err_type)
self._init_fs(fs_path, self._prefix)
elif self._rendezvous == Gloo.RENDEZVOUS.HTTP:
ip = kwargs.get("http.host", "")
port = kwargs.get("http.port", "")
if not ip or not port:
raise ValueError(self._err_type)
self._init_http(ip, port, self._prefix)
else:
raise ValueError(self._err_type)
self._is_initialized = True
def _init_fs(self, fs_path, prefix):
def init(rank, nodes, role):
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(nodes)
gloo.set_prefix(prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
gloo.set_hdfs_store(os.path.join(fs_path, role), "", "")
gloo.init()
return gloo
if self._role == Role.WORKER:
rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo
else:
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
def _init_dfs(self, dfs_name, dfs_ugi, dfs_path, prefix):
def init(rank, nodes, role):
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(nodes)
gloo.set_prefix(prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
gloo.set_hdfs_store(os.path.join(dfs_path, role), dfs_name, dfs_ugi)
gloo.init()
return gloo
if self._role == Role.WORKER:
rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo
else:
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
def _init_http(self, ip, port, prefix):
def __start_kv_server(http_server_d, size_d):
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(port, size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
time.sleep(wait_seconds)
http_server.stop()
def init_kv_server():
size_d = {
"trainer": self._worker_num,
"pserver": self._server_num,
"all": self._worker_num + self._server_num
}
_http_server_d = {"running": True}
# child process for http server
_http_server = Process(
target=__start_kv_server, args=(_http_server_d, size_d))
_http_server.daemon = True
# set running status to True
# start child process
_http_server.start()
def init(rank, nodes, role):
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(nodes)
gloo.set_prefix(prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
gloo.set_http_store(ip, port, role)
return gloo
port = int(port)
if self._role == Role.SERVER and self._role_id == 0:
init_kv_server()
if self._role == Role.WORKER:
rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo
else:
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
def _get_rank_nodes(self, role):
nodes = 0
rank = -1
if role == Role.WORKER:
nodes = self._worker_num
rank = self._role_id
elif role == Role.SERVER:
nodes = self._server_num
rank = self._role_id
elif role == Role.ALL:
nodes = self._worker_num + self._server_num
if self._role == Role.WORKER:
rank = self._role_id
else:
rank = self._worker_num + self._role_id
else:
ValueError(self._err_type)
return rank, nodes
def __get_default_iface(self):
"""
get default physical interface
"""
default1 = self.__get_default_iface_from_gateway()
default2 = self.__get_default_iface_from_interfaces()
return default2 if default1 == "lo" else default1
def __get_default_iface_from_gateway(self):
"""
get default physical interface
"""
import netifaces
gateways = netifaces.gateways()
if gateways.get(netifaces.AF_INET) != None:
gateway = gateways[netifaces.AF_INET]
if len(gateway) > 0 and len(gateway[0]) > 1:
return gateway[0][1]
return "lo"
def __get_default_iface_from_interfaces(self):
"""
get default physical interface
"""
import netifaces
for intf_name in netifaces.interfaces():
addresses = netifaces.ifaddresses(intf_name)
if netifaces.AF_INET in addresses:
ipv4_addresses = addresses[netifaces.AF_INET]
for ipv4_address in ipv4_addresses:
if 'broadcast' in ipv4_address:
return intf_name
return "lo"
def barrier(self, comm_world):
"""
dummy barrier, do nothing
"""
if not self._is_initialized:
warnings.warn(self._err_init)
return
if comm_world not in self._comm_world:
raise ValueError(self._err_world)
if comm_world == "worker":
self._worker_comm.barrier()
elif comm_world == "server":
self._server_comm.barrier()
else:
self._nodes_comm.barrier()
def all_reduce(self, input, mode="sum", comm_world="worker"):
if not self._is_initialized:
warnings.warn(self._err_init)
return input
if comm_world not in self._comm_world:
raise ValueError(self._err_world)
input = np.array(input)
input_shape = input.shape
input_list = input.reshape(-1).tolist()
self.barrier(comm_world)
if comm_world == "worker":
ans = self._worker_comm.all_reduce(input_list, mode)
elif comm_world == "server":
ans = self._server_comm.all_reduce(input_list, mode)
else:
ans = self._nodes_comm.all_reduce(input_list, mode)
output = np.array(ans).reshape(input_shape)
return output
def all_gather(self, input, comm_world="worker"):
"""
dummy all gather, do nothing
Args:
obj(any): obj to do all gather
"""
if not self._is_initialized:
warnings.warn(self._err_init)
return input
if comm_world not in self._comm_world:
raise ValueError(self._err_world)
if comm_world == "worker":
output = self._worker_comm.all_gather(input)
elif comm_world == "server":
output = self._server_comm.all_gather(input)
else:
output = self._nodes_comm.all_gather(input)
return output
class RoleMakerBase(object):
......@@ -47,10 +361,6 @@ class RoleMakerBase(object):
self._heter_trainer_device = "CPU"
self._is_heter_parameter_server_mode = False
self._node_type = None
self._node_type_comm = None
self._all_comm = None
def is_worker(self):
"""
return is_worker() of current process
......@@ -142,19 +452,11 @@ class RoleMakerBase(object):
self._role, self._current_id, self._worker_endpoints,
self._server_endpoints)
def _all_gather(self, comm_world, input):
"""
Args:
input(int|float): input value
Returns:
return a list of values
"""
print("warning: RoleMakerBase does not have all gather.")
def _all_gather(self, input, comm_world="worker"):
print("warning: RoleMakerBase does not have all gather worker.")
return None
def _all_reduce(self, comm_world, input, mode="sum"):
def _all_reduce(self, input, mode="sum", comm_world="worker"):
"""
Args:
input(list/numpy.array): array of one dim
......@@ -221,73 +523,25 @@ class PaddleCloudRoleMaker(RoleMakerBase):
def __init__(self, is_collective=False, **kwargs):
super(PaddleCloudRoleMaker, self).__init__()
self._is_collective = is_collective
self._init_gloo = False # default no init gloo
self._kwargs = kwargs
self._non_distributed = False
self._kwargs = kwargs
self._role_is_generated = False
self._server_endpoints = None
self._worker_endpoints = None
self._node_type_comm = None
self._all_comm = None
self._non_distributed = False
if not self._is_collective:
self._hdfs_name = kwargs.get("hdfs_name", "")
self._hdfs_ugi = kwargs.get("hdfs_ugi", "")
self._hdfs_path = kwargs.get("path", "").rstrip("/")
self._init_timeout_seconds = kwargs.get("init_timeout_seconds",
3600)
self._run_timeout_seconds = kwargs.get("run_timeout_seconds",
9999999)
ip_port = kwargs.get("http_ip_port", "")
self._http_ip_port = []
self._http_server = None
# if ip_port is not empty, it will use http instead of hdfs
if ip_port != "":
self._http_ip_port = ip_port.split(":")
# it's for communication between processes
self._manager = Manager()
# global dict to store status
self._http_server_d = self._manager.dict()
# set running status of http server
self._http_server_d["running"] = False
self._iface = self.__get_default_iface()
# this environment variable can be empty
self._prefix = os.getenv("SYS_JOB_ID", "")
self._gloo = Gloo() # gloo instance
def _barrier(self, comm_world):
if isinstance(comm_world, fluid.core.Gloo):
comm_world.barrier()
else:
print("warning: must init Gloo before using _barrier() function")
def _all_gather(self, comm_world, input):
if isinstance(comm_world, fluid.core.Gloo):
self._barrier(comm_world)
output = comm_world.all_gather(input)
return output
else:
print("warning: must init Gloo before using _all_gather() function")
return None
def _all_reduce(self, comm_world, input, mode="sum"):
if isinstance(comm_world, fluid.core.Gloo):
input = np.array(input)
self._gloo.barrier(comm_world)
input_shape = input.shape
input_list = input.reshape(-1).tolist()
def _all_gather(self, input, comm_world="worker"):
return self._gloo.all_gather(input, comm_world)
self._barrier(comm_world)
ans = comm_world.all_reduce(input_list, mode)
output = np.array(ans).reshape(input_shape)
return output
else:
print("warning: must init Gloo before using _all_reduce() function")
return None
def _all_reduce(self, input, mode="sum", comm_world="worker"):
return self._gloo.all_reduce(input, mode, comm_world)
def is_worker(self):
"""
......@@ -349,7 +603,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
"""
if not self._role_is_generated:
self.generate_role()
return self._trainers_num
return len(self.get_pserver_endpoints())
def node_num(self):
"""
......@@ -421,8 +675,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
# Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set
# format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002
self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS",
"").split(",")
if self._server_endpoints is None:
# back to non_distributed execution.
self._server_endpoints = ""
......@@ -436,6 +689,13 @@ class PaddleCloudRoleMaker(RoleMakerBase):
return
self._server_endpoints = self._server_endpoints.split(",")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
if self._worker_endpoints:
self._worker_endpoints = self._worker_endpoints.split(",")
else:
self._worker_endpoints = []
trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"])
training_role = os.environ["TRAINING_ROLE"]
......@@ -506,6 +766,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
assert (self._training_role == "TRAINER")
self._role = Role.WORKER
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
self._cur_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
if self._worker_endpoints is None:
......@@ -518,74 +779,64 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints]))
def _init_gloo_env(self):
def init_gloo_instance(role="trainer"):
role = role.lower()
assert role in ["trainer", "pserver", "all"]
if role == "trainer":
all_list = self._worker_endpoints
rank = self._current_id
elif role == "pserver":
all_list = self._server_endpoints
rank = self._current_id
else:
all_list = self._worker_endpoints + self._server_endpoints
rank = all_list.index(self._cur_endpoint)
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(len(all_list))
gloo.set_prefix(self._prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
if len(self._http_ip_port) != 0:
gloo.set_http_store(self._http_ip_port[0],
int(self._http_ip_port[1]), role)
else:
gloo.set_hdfs_store(self._hdfs_path + "/" + role,
self._hdfs_name, self._hdfs_ugi)
gloo.init()
return gloo
# paddlecloud support gloo
if self._role == Role.WORKER:
if self._current_id == 0 and len(self._http_ip_port) != 0:
size_d = {
"trainer": len(self._worker_endpoints),
"pserver": len(self._server_endpoints),
"all":
len(self._worker_endpoints) + len(self._server_endpoints)
}
# child process for http server
self._http_server = Process(
target=self.__start_kv_server,
args=(self._http_server_d, size_d))
self._http_server.daemon = True
# set running status to True
self._http_server_d["running"] = True
# start child process
self._http_server.start()
self._node_type = 1
gloo = init_gloo_instance("trainer")
self._node_type_comm = gloo
def _gloo_init(self):
# PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier
use_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0"))
if use_gloo not in [1, 2]:
return
# PADDLE_GLOO_RENDEZVOUS 1: HDFS 2: FILE 3: HTTP
rendezvous_type = int(os.getenv("PADDLE_GLOO_RENDEZVOUS", "0"))
prefix = os.getenv("SYS_JOB_ID", "")
if rendezvous_type not in [
Gloo.RENDEZVOUS.HDFS, Gloo.RENDEZVOUS.HTTP, Gloo.RENDEZVOUS.FILE
]:
raise ValueError(self._gloo._err_type)
need_init_all = True if use_gloo == 2 else False
if rendezvous_type == Gloo.RENDEZVOUS.HDFS:
dfs_name = os.getenv("PADDLE_GLOO_FS_NAME", "")
dfs_ugi = os.getenv("PADDLE_GLOO_FS_UGI", "")
dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "")
kwargs = {
"dfs.name": dfs_name,
"dfs.ugi": dfs_ugi,
"dfs.path": dfs_path,
"store.prefix": prefix,
}
elif rendezvous_type == Gloo.RENDEZVOUS.HTTP:
ip = os.getenv("PADDLE_GLOO_HTTP_HOST", "")
port = os.getenv("PADDLE_GLOO_HTTP_PORT", "")
kwargs = {
"http.host": ip,
"http.port": port,
"store.prefix": prefix,
}
else:
assert self._role == Role.SERVER
self._node_type = 0
gloo = init_gloo_instance("pserver")
self._node_type_comm = gloo
all_list = self._worker_endpoints + self._server_endpoints
self._rank = all_list.index(self._cur_endpoint)
self._size = len(all_list)
gloo = init_gloo_instance("all")
self._all_comm = gloo
if self._http_server is not None:
# set running status to False
self._http_server_d["running"] = False
# wait until child process exits
self._http_server.join()
dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "")
kwargs = {
"dfs.path": dfs_path,
"store.prefix": prefix,
}
if rendezvous_type == Gloo.RENDEZVOUS.HDFS:
type = "HDFS"
elif rendezvous_type == Gloo.RENDEZVOUS.HTTP:
type = "HTTP"
else:
type = "FILE"
print("Gloo init with {}: need_init_all: {}, args: {}".format(
type, need_init_all, kwargs))
self._gloo.init(
rendezvous=rendezvous_type,
role=self._role,
role_id=self.role_id(),
worker_num=self.worker_num(),
server_num=self.server_num(),
need_init_all=need_init_all,
kwargs=kwargs)
def generate_role(self):
"""
......@@ -594,57 +845,10 @@ class PaddleCloudRoleMaker(RoleMakerBase):
if not self._role_is_generated:
if not self._is_collective:
self._ps_env()
if "PADDLE_WITH_GLOO" in os.environ:
self._init_gloo = bool(os.environ["PADDLE_WITH_GLOO"])
if self._init_gloo:
self._init_gloo_env()
else:
self._collective_env()
self._role_is_generated = True
def __get_default_iface(self):
"""
get default physical interface
"""
default1 = self.__get_default_iface_from_gateway()
default2 = self.__get_default_iface_from_interfaces()
return default2 if default1 == "lo" else default1
def __get_default_iface_from_gateway(self):
"""
get default physical interface
"""
import netifaces
gateways = netifaces.gateways()
if gateways.get(netifaces.AF_INET) != None:
gateway = gateways[netifaces.AF_INET]
if len(gateway) > 0 and len(gateway[0]) > 1:
return gateway[0][1]
return "lo"
def __get_default_iface_from_interfaces(self):
"""
get default physical interface
"""
import netifaces
for intf_name in netifaces.interfaces():
addresses = netifaces.ifaddresses(intf_name)
if netifaces.AF_INET in addresses:
ipv4_addresses = addresses[netifaces.AF_INET]
for ipv4_address in ipv4_addresses:
if 'broadcast' in ipv4_address:
return intf_name
return "lo"
def __start_kv_server(self, http_server_d, size_d):
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(int(self._http_ip_port[1]), size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
time.sleep(wait_seconds)
http_server.stop()
self._gloo_init()
class UserDefinedRoleMaker(PaddleCloudRoleMaker):
......@@ -677,7 +881,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
self._worker_endpoints = self._kwargs.get("worker_endpoints")
self._current_id = self._kwargs.get("current_id")
self._trainers_num = len(self._worker_endpoints)
self._training_role = Role.Worker
self._training_role = Role.WORKER
self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints]))
......@@ -688,8 +892,6 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
if not self._role_is_generated:
if not self._is_collective:
self._user_defined_ps_env()
if self._init_gloo:
self._init_gloo_env()
else:
self._user_defined_collective_env()
self._role_is_generated = True
......@@ -57,34 +57,7 @@ class UtilBase(object):
), "fs_client must be the instance of paddle.distributed.fleet.utils.FS"
self.fs_client = fs_client
def __check_comm_world(self, comm_world="worker"):
if not self.role_maker._role_is_generated:
self.role_maker.generate_role()
_comm_world = None
comm_world_upper = comm_world.upper()
if comm_world_upper == "WORKER":
if not self.role_maker.is_worker():
print(
"warning: current role is not worker in collective_func(comm_world=\"worker\")"
)
_comm_world = self.role_maker._node_type_comm
elif comm_world_upper == "SERVER":
if not self.role_maker.is_server():
print(
"warning: current role is not server in collective_func(comm_world=\"server\")"
)
_comm_world = self.role_maker._node_type_comm
elif comm_world_upper == "ALL":
_comm_world = self.role_maker._all_comm
else:
raise ValueError(
"not support comm_world, please choose one from [worker, server, all]"
)
return _comm_world
def all_reduce(self, input, mode, comm_world="worker"):
def all_reduce(self, input, mode="sum", comm_world="worker"):
"""
All reduce `input` between specified collection. This is a distributed API.
......@@ -130,8 +103,7 @@ class UtilBase(object):
if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_reduce(_comm_world, input, mode)
return self.role_maker._all_reduce(input, mode, comm_world)
def barrier(self, comm_world="worker"):
"""
......@@ -170,8 +142,7 @@ class UtilBase(object):
if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
self.role_maker._barrier(_comm_world)
self.role_maker._barrier(comm_world)
def all_gather(self, input, comm_world="worker"):
"""
......@@ -219,8 +190,8 @@ class UtilBase(object):
if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_gather(_comm_world, input)
return self.role_maker._all_gather(input, comm_world)
def _broadcast(self):
pass
......
......@@ -55,7 +55,10 @@ launch a process on each of the given gpu card or cpu machine.
"""
from __future__ import print_function
import shutil
import sys
import tempfile
from sys import version
import subprocess
import os
......@@ -213,12 +216,20 @@ def launch_collective(args):
cluster, pod = get_cluster_from_args(args, gpus)
logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = "1"
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "2"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
procs = start_local_trainers(
cluster,
pod,
training_script=args.training_script,
training_script_args=args.training_script_args,
log_dir=args.log_dir)
log_dir=args.log_dir,
envs=global_envs)
while True:
alive = watch_local_trainers(procs, cluster.trainers_nranks())
......@@ -230,6 +241,9 @@ def launch_collective(args):
time.sleep(3)
if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
def launch_ps(args):
ports = None
......@@ -315,6 +329,13 @@ def launch_ps(args):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
current_env["PADDLE_WITH_GLOO"] = "1"
current_env["PADDLE_GLOO_RENDEZVOUS"] = "2"
current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
......@@ -419,6 +440,9 @@ def launch_ps(args):
procs[i].proc.terminate()
print("all parameter server are killed", file=sys.stderr)
if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
def launch():
args = _parse_args()
......
......@@ -398,8 +398,14 @@ def start_local_trainers(cluster,
pod,
training_script,
training_script_args,
log_dir=None):
current_env = copy.copy(os.environ.copy())
log_dir=None,
envs=None):
if envs is None:
current_env = copy.copy(os.environ.copy())
else:
current_env = copy.copy(envs)
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
......
......@@ -27,7 +27,7 @@ class TestFleetBase(unittest.TestCase):
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
"127.0.0.1:36001,127.0.0.2:36001"
def test_init(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
......@@ -88,7 +88,7 @@ class TestFleetBase(unittest.TestCase):
def test_util(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
self.assertEqual(fleet.util, None)
self.assertEqual(fleet.util(), None)
def test_barrier_worker(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
......@@ -99,20 +99,17 @@ class TestFleetBase(unittest.TestCase):
def test_init_worker(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.init_worker()
def test_run_server(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.run_worker()
with self.assertRaises(ValueError):
if fleet.is_worker():
fleet.init_worker()
def test_stop_worker(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.stop_worker()
with self.assertRaises(ValueError):
if fleet.is_worker():
fleet.stop_worker()
def test_distributed_optimizer(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
......
......@@ -15,7 +15,11 @@
from __future__ import print_function
import os
import platform
import shutil
import tempfile
import unittest
import paddle
import paddle.distributed.fleet.base.role_maker as role_maker
......@@ -42,9 +46,9 @@ class TestRoleMakerBase(unittest.TestCase):
self.assertTrue(len(pserver_endpoints) == 0)
print(role.to_string())
self.assertTrue(role._all_gather(role._node_type_comm, 1) is None)
self.assertTrue(role._all_reduce(role._node_type_comm, 1) is None)
role._barrier(role._node_type_comm)
self.assertTrue(role._all_gather(1, "worker") is None)
self.assertTrue(role._all_reduce(1, "sum", "worker") is None)
role._barrier("worker")
class TestCloudRoleMaker(unittest.TestCase):
......@@ -72,8 +76,8 @@ class TestCloudRoleMaker(unittest.TestCase):
print("warning: no netifaces, skip test_tr_rolemaker")
return
ro = role_maker.PaddleCloudRoleMaker(
is_collective=False, init_gloo=False)
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertTrue(ro.is_worker())
self.assertFalse(ro.is_server())
self.assertEqual(ro.worker_num(), 2)
......@@ -108,8 +112,9 @@ class TestCloudRoleMaker(unittest.TestCase):
self.assertEqual(ro.server_num(), 2)
pserver_endpoints = ro.get_pserver_endpoints()
self.assertEqual(pserver_endpoints[0], '127.0.0.1:36001')
self.assertTrue(ro._all_gather(ro._all_comm, 1) is None)
self.assertTrue(ro._all_reduce(ro._all_comm, 1) is None)
self.assertEqual(ro._all_gather(1, "worker"), 1)
self.assertEqual(ro._all_reduce(1, "sum", "worker"), 1)
def test_traing_role(self):
"""Test training role."""
......@@ -142,7 +147,7 @@ class TestUserDefinedRoleMaker(unittest.TestCase):
ro = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
server_endpoints="127.0.0.1:36001,127.0.0.1:36001",
server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"],
role=role_maker.Role.SERVER,
current_id=0,
worker_num=2)
......@@ -161,14 +166,274 @@ class TestUserDefinedRoleMaker(unittest.TestCase):
ro = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
server_endpoints="127.0.0.1:36001,127.0.0.1:36001",
server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"],
role=role_maker.Role.WORKER,
current_id=0,
worker_num=2)
self.assertIn("127.0.0.1:36001", ro.get_pserver_endpoints())
self.assertTrue(ro.is_worker())
self.assertEqual(ro.role_id(), 0)
class TestGlooWithCloudRoleMaker(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_TRAINERS_NUM"] = "1"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ID"] = "0"
def case(self, role, comm_world):
role._barrier(comm_world)
gather = role._all_gather(1, comm_world)
self.assertEqual(gather[0], 1)
all_reduce = role._all_reduce(1, "sum", comm_world)
self.assertEqual(1, all_reduce)
def mkdir(self):
tmp = tempfile.mkdtemp()
return tmp
def clean(self, tmp):
shutil.rmtree(tmp)
def test_hdfs_gloo(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "worker")
self.clean(tmp)
def test_fs_gloo(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "worker")
self.clean(tmp)
def test_fs_gloo2(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.clean(tmp)
def test_fs_gloo3(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.clean(tmp)
def test_fs_gloo4(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3"
os.environ["PADDLE_GLOO_HTTP_HOST"] = "127.0.0.1"
os.environ["PADDLE_GLOO_HTTP_PORT"] = "30019"
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
import time
time.sleep(3)
def test_fs_gloo5(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "2"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.case(role, "all")
self.clean(tmp)
def test_fs_gloo6(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "2"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.case(role, "all")
self.clean(tmp)
def test_fs_gloo7(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "5"
role = role_maker.PaddleCloudRoleMaker()
self.assertRaises(ValueError, role.generate_role)
def test_fs_gloo8(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "2"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
def net():
x = paddle.fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = paddle.fluid.layers.fc(input=x, size=1, act=None)
y = paddle.fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = paddle.fluid.layers.square_error_cost(
input=y_predict, label=y)
avg_cost = paddle.fluid.layers.mean(cost)
return avg_cost
from paddle.distributed import fleet
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
avg_cost = net()
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False
optimizer = paddle.optimizer.SGD(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
comm_world = "server"
fleet.util().barrier(comm_world)
gather = fleet.util().all_gather(1, comm_world)
self.assertEqual(gather[0], 1)
all_reduce = fleet.util().all_reduce(1, "sum", comm_world)
self.assertEqual(1, all_reduce)
self.clean(tmp)
if __name__ == "__main__":
unittest.main()
......@@ -59,7 +59,7 @@ class TestFleetUtil(unittest.TestCase):
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
default_util = fleet.util
default_util = fleet.util()
self.assertEqual(default_util, None)
def test_set_user_defined_util(self):
......@@ -76,8 +76,8 @@ class TestFleetUtil(unittest.TestCase):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
my_util = UserDefinedUtil()
fleet.util = my_util
user_id = fleet.util.get_user_id()
fleet.set_util(my_util)
user_id = fleet.util().get_user_id()
self.assertEqual(user_id, 10)
def test_fs(self):
......@@ -88,97 +88,6 @@ class TestFleetUtil(unittest.TestCase):
self.assertFalse(fs.need_upload_download())
fleet_util._set_file_system(fs)
def test_barrier(self):
try:
import netifaces
except:
print("warning: no netifaces, skip test_barrier")
return
gloo = fluid.core.Gloo()
gloo.set_rank(0)
gloo.set_size(1)
gloo.set_prefix("123")
gloo.set_iface("lo")
gloo.set_hdfs_store("./tmp_test_fleet_barrier", "", "")
gloo.init()
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.SERVER,
worker_endpoints=["127.0.0.1:6003"],
server_endpoints=["127.0.0.1:6001"])
role._node_type_comm = gloo
role._role_is_generated = True
fleet_util._set_role_maker(role)
fleet_util.barrier("worker")
def test_all_reduce(self):
try:
import netifaces
except:
print("warning: no netifaces, skip test_all_reduce")
return
gloo = fluid.core.Gloo()
gloo.set_rank(0)
gloo.set_size(1)
gloo.set_prefix("123")
gloo.set_iface("lo")
gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "")
gloo.init()
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003"],
server_endpoints=["127.0.0.1:6001"])
role._node_type_comm = gloo
role._role_is_generated = True
fleet_util._set_role_maker(role)
output = fleet_util.all_reduce(1, "sum", comm_world="server")
print(output)
# self.assertEqual(output, 1)
def test_all_gather(self):
try:
import netifaces
except:
print("warning: no netifaces, skip test_all_gather")
return
gloo = fluid.core.Gloo()
gloo.set_rank(0)
gloo.set_size(1)
gloo.set_prefix("123")
gloo.set_iface("lo")
gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "")
gloo.init()
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.SERVER,
worker_endpoints=["127.0.0.1:6003"],
server_endpoints=["127.0.0.1:6001"])
role._node_type_comm = gloo
role._all_comm = gloo
role._role_is_generated = True
fleet_util._set_role_maker(role)
output = fleet_util.all_gather(1, comm_world="all")
print(output)
# self.assertTrue(len(output) == 1 and output[0] == 1)
self.assertRaises(Exception, fleet_util.all_gather, 1, "test")
def download_files(self):
path = download(self.proto_data_url, self.module_name,
self.proto_data_md5)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册