# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Defination of Role Makers.""" import os import time import numpy as np import warnings from multiprocessing import Process, Manager import paddle.fluid as fluid class Role: WORKER = 1 SERVER = 2 HETER_WORKER = 3 ALL = 4 class Gloo(object): """ Gloo is a universal class for barrier and collective communication """ class RENDEZVOUS: HDFS = 1 FILE = 2 HTTP = 3 def __init__(self): self._worker_comm = None self._server_comm = None self._nodes_comm = None self._comm_world = ["worker", "server", "all"] self._err_init = "gloo is not initialized, will not communicator with other nodes" self._err_type = "gloo initialized error, please check arguments" self._err_world = "argument error, comm_world must in {}".format( self._comm_world) self._is_initialized = False self._init_timeout_seconds = 3600 self._run_timeout_seconds = 9999999 self._rendezvous = None self._role = None self._iface = None self._role_id = -1 self._worker_num = -1 self._server_num = -1 self._need_init_all = False def init(self, rendezvous, role, role_id, worker_num, server_num, need_init_all=False, kwargs=None): self._rendezvous = rendezvous self._role = role self._role_id = role_id self._worker_num = worker_num self._server_num = server_num self._need_init_all = need_init_all self._iface = self.__get_default_iface() self._prefix = kwargs.get("store.prefix", "") if self._rendezvous == Gloo.RENDEZVOUS.HDFS: dfs_name = kwargs.get("dfs.name", "") dfs_ugi = kwargs.get("dfs.ugi", "") dfs_path = kwargs.get("dfs.path", "") if not dfs_name or not dfs_ugi or not dfs_path: raise ValueError(self._err_type) self._init_dfs(dfs_name, dfs_ugi, dfs_path, self._prefix) elif self._rendezvous == Gloo.RENDEZVOUS.FILE: fs_path = kwargs.get("dfs.path", "") if not fs_path: raise ValueError(self._err_type) self._init_fs(fs_path, self._prefix) elif self._rendezvous == Gloo.RENDEZVOUS.HTTP: ip = kwargs.get("http.host", "") port = kwargs.get("http.port", "") if not ip or not port: raise ValueError(self._err_type) self._init_http(ip, port, self._prefix) else: raise ValueError(self._err_type) self._is_initialized = True def _init_fs(self, fs_path, prefix): def init(rank, nodes, role): gloo = fluid.core.Gloo() gloo.set_rank(rank) gloo.set_size(nodes) gloo.set_prefix(prefix) gloo.set_iface(self._iface) gloo.set_timeout_seconds(self._init_timeout_seconds, self._run_timeout_seconds) gloo.set_hdfs_store(os.path.join(fs_path, role), "", "") gloo.init() return gloo if self._role == Role.WORKER: rank, nodes = self._get_rank_nodes(Role.WORKER) gloo = init(rank, nodes, "WORKER") self._worker_comm = gloo else: rank, nodes = self._get_rank_nodes(Role.SERVER) gloo = init(rank, nodes, "SERVER") self._server_comm = gloo if self._need_init_all: rank, nodes = self._get_rank_nodes(Role.ALL) gloo = init(rank, nodes, "ALL") self._nodes_comm = gloo def _init_dfs(self, dfs_name, dfs_ugi, dfs_path, prefix): def init(rank, nodes, role): gloo = fluid.core.Gloo() gloo.set_rank(rank) gloo.set_size(nodes) gloo.set_prefix(prefix) gloo.set_iface(self._iface) gloo.set_timeout_seconds(self._init_timeout_seconds, self._run_timeout_seconds) gloo.set_hdfs_store(os.path.join(dfs_path, role), dfs_name, dfs_ugi) gloo.init() return gloo if self._role == Role.WORKER: rank, nodes = self._get_rank_nodes(Role.WORKER) gloo = init(rank, nodes, "WORKER") self._worker_comm = gloo else: rank, nodes = self._get_rank_nodes(Role.SERVER) gloo = init(rank, nodes, "SERVER") self._server_comm = gloo if self._need_init_all: rank, nodes = self._get_rank_nodes(Role.ALL) gloo = init(rank, nodes, "ALL") self._nodes_comm = gloo def _init_http(self, ip, port, prefix): def __start_kv_server(http_server_d, size_d): from paddle.distributed.fleet.utils.http_server import KVServer http_server = KVServer(port, size_d) http_server.start() wait_seconds = 5 while http_server_d.get("running", False) and not http_server.shoud_stop(): time.sleep(wait_seconds) http_server.stop() def init_kv_server(): size_d = { "trainer": self._worker_num, "pserver": self._server_num, "all": self._worker_num + self._server_num } _http_server_d = {"running": True} # child process for http server _http_server = Process( target=__start_kv_server, args=(_http_server_d, size_d)) _http_server.daemon = True # set running status to True # start child process _http_server.start() def init(rank, nodes, role): gloo = fluid.core.Gloo() gloo.set_rank(rank) gloo.set_size(nodes) gloo.set_prefix(prefix) gloo.set_iface(self._iface) gloo.set_timeout_seconds(self._init_timeout_seconds, self._run_timeout_seconds) gloo.set_http_store(ip, port, role) return gloo port = int(port) if self._role == Role.SERVER and self._role_id == 0: init_kv_server() if self._role == Role.WORKER: rank, nodes = self._get_rank_nodes(Role.WORKER) gloo = init(rank, nodes, "WORKER") self._worker_comm = gloo else: rank, nodes = self._get_rank_nodes(Role.SERVER) gloo = init(rank, nodes, "SERVER") self._server_comm = gloo if self._need_init_all: rank, nodes = self._get_rank_nodes(Role.ALL) gloo = init(rank, nodes, "ALL") self._nodes_comm = gloo def _get_rank_nodes(self, role): nodes = 0 rank = -1 if role == Role.WORKER: nodes = self._worker_num rank = self._role_id elif role == Role.SERVER: nodes = self._server_num rank = self._role_id elif role == Role.ALL: nodes = self._worker_num + self._server_num if self._role == Role.WORKER: rank = self._role_id else: rank = self._worker_num + self._role_id else: ValueError(self._err_type) return rank, nodes def __get_default_iface(self): """ get default physical interface """ default1 = self.__get_default_iface_from_gateway() default2 = self.__get_default_iface_from_interfaces() return default2 if default1 == "lo" else default1 def __get_default_iface_from_gateway(self): """ get default physical interface """ import netifaces gateways = netifaces.gateways() if gateways.get(netifaces.AF_INET) != None: gateway = gateways[netifaces.AF_INET] if len(gateway) > 0 and len(gateway[0]) > 1: return gateway[0][1] return "lo" def __get_default_iface_from_interfaces(self): """ get default physical interface """ import netifaces for intf_name in netifaces.interfaces(): addresses = netifaces.ifaddresses(intf_name) if netifaces.AF_INET in addresses: ipv4_addresses = addresses[netifaces.AF_INET] for ipv4_address in ipv4_addresses: if 'broadcast' in ipv4_address: return intf_name return "lo" def barrier(self, comm_world): """ dummy barrier, do nothing """ if not self._is_initialized: warnings.warn(self._err_init) return if comm_world not in self._comm_world: raise ValueError(self._err_world) if comm_world == "worker": self._worker_comm.barrier() elif comm_world == "server": self._server_comm.barrier() else: self._nodes_comm.barrier() def all_reduce(self, input, mode="sum", comm_world="worker"): if not self._is_initialized: warnings.warn(self._err_init) return input if comm_world not in self._comm_world: raise ValueError(self._err_world) input = np.array(input) input_shape = input.shape input_list = input.reshape(-1).tolist() self.barrier(comm_world) if comm_world == "worker": ans = self._worker_comm.all_reduce(input_list, mode) elif comm_world == "server": ans = self._server_comm.all_reduce(input_list, mode) else: ans = self._nodes_comm.all_reduce(input_list, mode) output = np.array(ans).reshape(input_shape) return output def all_gather(self, input, comm_world="worker"): """ dummy all gather, do nothing Args: obj(any): obj to do all gather """ if not self._is_initialized: warnings.warn(self._err_init) return input if comm_world not in self._comm_world: raise ValueError(self._err_world) if comm_world == "worker": output = self._worker_comm.all_gather(input) elif comm_world == "server": output = self._server_comm.all_gather(input) else: output = self._nodes_comm.all_gather(input) return output class RoleMakerBase(object): """ RoleMakerBase is a base class for assigning a role to current process in distributed training. A paddle developer can implement RoleMakerBase to design a role maker for worker or pserver assignment. """ def __init__(self): self._worker_endpoints = [] self._server_endpoints = [] self._role_is_generated = False self._role = None self._current_id = -1 # for heter parameter server mode self._heter_trainer_endpoints = [] self._heter_trainer_device = "CPU" self._is_heter_parameter_server_mode = False def _is_worker(self): """ return is_worker() of current process """ raise NotImplementedError("Please implement this method in child class") def _is_server(self): """ return is_server() of current process """ raise NotImplementedError("Please implement this method in child class") def _is_first_worker(self): """ Check whether the node is the first instance of worker. Returns: bool: True if this is the first node of worker, False if not. """ raise NotImplementedError("Please implement this method in child class") def _worker_num(self): """ Get current total worker number. Returns: int: worker number """ raise NotImplementedError("Please implement this method in child class") def _server_num(self): """ Get current total server number. Returns: int: server number """ raise NotImplementedError("Please implement this method in child class") def _worker_index(self): """ Get current worker id. Returns: int: node id """ raise NotImplementedError("Please implement this method in child class") def _server_index(self): """ Get current server id. Returns: int: node id """ raise NotImplementedError("Please implement this method in child class") def _role_id(self): """ Get current id. Returns: int: node id """ raise NotImplementedError("Please implement this method in child class") def _node_num(self): """ Get the training node number Returns: int: node num """ raise NotImplementedError("Please implement this method in child class") def _get_trainer_endpoints(self): """ return trainer endpoints """ return self._worker_endpoints def _get_pserver_endpoints(self): """ return pserver endpoints """ return self._server_endpoints def to_string(self): return "role: {}, current_id: {}, worker_endpoints: {}, server_endpoints: {}".format( self._role, self._current_id, self._worker_endpoints, self._server_endpoints) def _all_gather(self, input, comm_world="worker"): print("warning: RoleMakerBase does not have all gather worker.") return None def _all_reduce(self, input, mode="sum", comm_world="worker"): """ Args: input(list/numpy.array): array of one dim output(list/numpy.array): array of one dim mode(str): "sum" or "min" or "max" """ print("warning: RoleMakerBase does not have all reduce worker.") return None def _barrier(self, comm_world): """ barrier between trainers if current role is TRAINER """ print("warning: RoleMakerBase does not have barrier worker.") def _is_heter_worker(self): """ Return is_heter_worker() of current process """ warnings.warn("RoleMakerBase does not have function: _is_heter_worker.") return False def _heter_worker_num(self): """ Get current total heter-worker number. Returns: int: heter_worker number """ warnings.warn( "RoleMakerBase does not have function: _heter_worker_num.") return 0 def _get_heter_worker_endpoints(self): """ Returns: string: all heter_trainers'endpoints """ assert self._heter_trainer_endpoints != [] return self._heter_trainer_endpoints def _get_heter_worker_endpoint(self): """ Returns: int: corresponding heter_trainer's endpoint e.g: if we have 4 cpu-trainer(default), 2 gpu-trainer(heter) then No.0 and No.2 cpu-trainer will work with No.0 gpu-trainer and No.1 and No.3 cpu-trainer will work with No.1 gpu-trainerr """ assert self._heter_trainer_endpoints != [] return self._heter_trainer_endpoints[(self._current_id) % self._heter_worker_num()] def _get_heter_worker_device(self): """ Returns: string: heter_trainer's device of current node, e.g: CPU/GPU/XPU """ return self._heter_trainer_device.upper() class PaddleCloudRoleMaker(RoleMakerBase): def __init__(self, is_collective=False, **kwargs): super(PaddleCloudRoleMaker, self).__init__() self._is_collective = is_collective self._non_distributed = False self._kwargs = kwargs self._role_is_generated = False self._server_endpoints = None self._worker_endpoints = None self._gloo = Gloo() # gloo instance def _barrier(self, comm_world): self._gloo.barrier(comm_world) def _all_gather(self, input, comm_world="worker"): return self._gloo.all_gather(input, comm_world) def _all_reduce(self, input, mode="sum", comm_world="worker"): return self._gloo.all_reduce(input, mode, comm_world) def _is_worker(self): """ whether current process is worker """ if not self._role_is_generated: self._generate_role() return self._role == Role.WORKER def _is_server(self): """ whether current process is server """ if not self._role_is_generated: self._generate_role() return self._role == Role.SERVER def _is_first_worker(self): """ whether current process is worker of rank 0 """ if not self._role_is_generated: self._generate_role() return self._role == Role.WORKER and self._current_id == 0 def _worker_index(self): """ get index of current worker """ if not self._role_is_generated: self._generate_role() return self._current_id def _server_index(self): """ get index of current server """ if not self._role_is_generated: self._generate_role() return self._current_id def _role_id(self): """ get index of current node """ if not self._role_is_generated: self._generate_role() return self._current_id def _worker_num(self): """ retrun the current number of worker """ if not self._role_is_generated: self._generate_role() return self._trainers_num def _server_num(self): """ return the current number of server """ if not self._role_is_generated: self._generate_role() return len(self._get_pserver_endpoints()) def _node_num(self): """ return the training node number """ if not self._role_is_generated: self._generate_role() return self._nodes_num def _get_trainer_endpoints(self): """ get endpoint of all trainers """ if not self._role_is_generated: self._generate_role() return self._worker_endpoints def _get_pserver_endpoints(self): """ get endpoint of all pservers """ if not self._role_is_generated: self._generate_role() return self._server_endpoints def _is_non_distributed(self): """ Return True if indispensable environment for fleetrun is not found (use python-run to launch fleet-code directly) """ if not self._role_is_generated: self._generate_role() return self._non_distributed def _heter_worker_num(self): """ get heter worker nums """ if not self._role_is_generated: self._generate_role() return self._heter_trainers_num def _is_heter_worker(self): """ whether current process is heter worker """ if not self._role_is_generated: self._generate_role() return self._role == Role.HETER_WORKER def _ps_env(self): # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST", None) if self._server_endpoints is None: # back to non_distributed execution. self._server_endpoints = "" self._trainers_num = 1 self._role = Role.WORKER self._current_id = 0 self._nodes_num = 1 self._heter_trainers_num = 0 self._heter_trainer_endpoints = None self._non_distributed = True return self._server_endpoints = self._server_endpoints.split(",") self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", None) if self._worker_endpoints != None: self._worker_endpoints = self._worker_endpoints.split(",") else: self._worker_endpoints = [] trainers_num = os.getenv("PADDLE_TRAINERS_NUM", None) assert trainers_num != None trainers_num = int(trainers_num) training_role = os.getenv("TRAINING_ROLE", None) assert training_role != None if training_role not in ["TRAINER", "PSERVER", "HETER_TRAINER"]: raise ValueError( "TRAINING_ROLE must be PSERVER or TRAINER or HETER_TRAINER, but get {}, please check your environment.". format(training_role)) # For heter parameter server env setting heter_trainer_eplist = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST", "") heter_trainer_device = os.getenv("PADDLE_HETER_TRAINER_DEVICE", "") if heter_trainer_eplist != "" and heter_trainer_device != "": try: heter_trainer_eplist = os.environ[ "PADDLE_HETER_TRAINER_IP_PORT_LIST"].split(",") except: raise ValueError( "Can not Find PADDLE_HETER_TRAINER_IP_PORT_LIST in env or its format doesn't match the requirement: 'IP:PORT,IP:PORT' ." ) self._is_heter_parameter_server_mode = True heter_trainers_num = len(heter_trainer_eplist) current_node_device = heter_trainer_device.upper() if current_node_device not in ["CPU", "GPU", "XPU"]: raise ValueError( "Heter Trainer doesn't support {} device now, please use CPU / GPU / XPU(KunLun)". format(heter_trainer_device)) self._heter_trainer_device = current_node_device else: self._is_heter_parameter_server_mode = False heter_trainers_num = 0 if training_role == "TRAINER": role = Role.WORKER current_id = os.getenv("PADDLE_TRAINER_ID", None) assert current_id != None current_id = int(current_id) if len(self._worker_endpoints) > 0: self._cur_endpoint = self._worker_endpoints[current_id] elif training_role == "PSERVER": role = Role.SERVER port = os.getenv("PADDLE_PORT", None) assert port != None ip = os.getenv("POD_IP", None) assert ip != None self._cur_endpoint = ip + ":" + port current_id = self._server_endpoints.index(self._cur_endpoint) elif training_role == "HETER_TRAINER": role = Role.HETER_WORKER cur_port = os.getenv("PADDLE_PORT", None) assert cur_port != None cur_ip = os.getenv("POD_IP", None) assert cur_ip != None curr_endpoint = ":".join([cur_ip, cur_port]) current_id = heter_trainer_eplist.index(curr_endpoint) else: raise ValueError( "TRAINING_ROLE must be PSERVER or TRAINER or HETER_TRAINER") self._trainers_num = trainers_num self._role = role self._current_id = current_id self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) self._heter_trainers_num = heter_trainers_num self._heter_trainer_endpoints = heter_trainer_eplist def _collective_env(self): self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER") assert (self._training_role == "TRAINER") self._role = Role.WORKER self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") self._cur_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") if self._worker_endpoints is None: # back to non_distributed execution. self._worker_endpoints = "127.0.0.1:6170" self._cur_endpoint = self._worker_endpoints self._non_distributed = True self._worker_endpoints = self._worker_endpoints.split(",") self._trainers_num = len(self._worker_endpoints) self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) def _gloo_init(self): # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier use_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0")) if use_gloo not in [1, 2]: return # PADDLE_GLOO_RENDEZVOUS 1: HDFS 2: FILE 3: HTTP rendezvous_type = int(os.getenv("PADDLE_GLOO_RENDEZVOUS", "0")) prefix = os.getenv("SYS_JOB_ID", "") if rendezvous_type not in [ Gloo.RENDEZVOUS.HDFS, Gloo.RENDEZVOUS.HTTP, Gloo.RENDEZVOUS.FILE ]: raise ValueError(self._gloo._err_type) need_init_all = True if use_gloo == 2 else False if rendezvous_type == Gloo.RENDEZVOUS.HDFS: dfs_name = os.getenv("PADDLE_GLOO_FS_NAME", "") dfs_ugi = os.getenv("PADDLE_GLOO_FS_UGI", "") dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") kwargs = { "dfs.name": dfs_name, "dfs.ugi": dfs_ugi, "dfs.path": dfs_path, "store.prefix": prefix, } elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: ip = os.getenv("PADDLE_GLOO_HTTP_HOST", "") port = os.getenv("PADDLE_GLOO_HTTP_PORT", "") kwargs = { "http.host": ip, "http.port": port, "store.prefix": prefix, } else: dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") kwargs = { "dfs.path": dfs_path, "store.prefix": prefix, } if rendezvous_type == Gloo.RENDEZVOUS.HDFS: type = "HDFS" elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: type = "HTTP" else: type = "FILE" print("Gloo init with {}: need_init_all: {}, args: {}".format( type, need_init_all, kwargs)) self._gloo.init( rendezvous=rendezvous_type, role=self._role, role_id=self._role_id(), worker_num=self._worker_num(), server_num=self._server_num(), need_init_all=need_init_all, kwargs=kwargs) def _generate_role(self): """ generate role for role maker """ if not self._role_is_generated: if not self._is_collective: self._ps_env() else: self._collective_env() self._role_is_generated = True self._gloo_init() class UserDefinedRoleMaker(PaddleCloudRoleMaker): def __init__(self, is_collective=False, init_gloo=False, **kwargs): super(UserDefinedRoleMaker, self).__init__( is_collective=is_collective, init_gloo=init_gloo, **kwargs) self._init_gloo = init_gloo def _user_defined_ps_env(self): self._server_endpoints = self._kwargs.get("server_endpoints") self._worker_endpoints = self._kwargs.get("worker_endpoints", []) self._trainers_num = self._kwargs.get("worker_num", 0) if self._trainers_num == 0: assert (len(self._worker_endpoints) > 0) self._trainers_num = len(self._worker_endpoints) self._role = self._kwargs.get("role") self._current_id = self._kwargs.get("current_id") if self._role == Role.WORKER and len( self._worker_endpoints) > self._current_id: self._cur_endpoint = self._worker_endpoints[self._current_id] elif self._role == Role.SERVER: self._cur_endpoint = self._server_endpoints[self._current_id] self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) def _user_defined_collective_env(self): self._worker_endpoints = self._kwargs.get("worker_endpoints") self._current_id = self._kwargs.get("current_id") self._trainers_num = len(self._worker_endpoints) self._training_role = Role.WORKER self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) def _generate_role(self): """ generate role for role maker """ if not self._role_is_generated: if not self._is_collective: self._user_defined_ps_env() else: self._user_defined_collective_env() self._role_is_generated = True