# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from ..context.device import DeviceType from .controller import ControleMode, Controller class CollectiveController(Controller): @classmethod def enable(cls, ctx): # collective is the default mode if ctx: ctx.logger.debug(f"{cls.__name__} enabled") ctx.args.run_mode = ControleMode.COLLECTIVE return True else: return False def build_pod(self): if ( self.ctx.args.master is None and self.ctx.args.start_port and self.ctx.args.ips ): return self._build_pod_with_args() else: return self._build_pod_with_master() def _build_pod_with_args(self): self.pod.replicas = self.pod_replicas() start_port = int(self.ctx.args.start_port) ips = self.ctx.args.ips.split(',') job_endpoints = [ f"{h}:{p+start_port}" for h in ips for p in range(self.pod.replicas) ] self.ctx.logger.debug(f"job endpoints: {job_endpoints}") rank_offset = ( ips.index(self.ctx.node.ip) * self.pod.replicas if self.ctx.node.ip in ips else 0 ) self.save_pod_log(job_endpoints) selected_dev_key = self.ctx.node.device.get_selected_device_key() selected_dev_list = self.ctx.node.device.get_selected_devices( self.ctx.args.devices ) for i in range(self.pod.replicas): e = { "PADDLE_GLOBAL_SIZE": f"{len(job_endpoints)}", "PADDLE_LOCAL_SIZE": f"{self.pod.replicas}", "PADDLE_GLOBAL_RANK": f"{i + rank_offset}", "PADDLE_LOCAL_RANK": f"{i}", "PADDLE_NNODES": f"{len(ips)}", # compatible env "PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints), "PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset], "PADDLE_TRAINER_ID": f"{i + rank_offset}", "PADDLE_TRAINERS_NUM": f"{len(job_endpoints)}", "PADDLE_RANK_IN_NODE": str(i), } if len(selected_dev_list) > 0: if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE: e.update(self.ctx.node.device.get_custom_device_envs()) if self.pod.replicas == 1: e.update({selected_dev_key: ",".join(selected_dev_list)}) else: e.update({selected_dev_key: selected_dev_list[i]}) else: e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) log_file = f"workerlog.{i}" self.add_container(envs=e, log_file=log_file) return True def _build_pod_with_master(self): self.pod.replicas = self.pod_replicas() # rank will be reset when restart self.pod.rank = int(self.ctx.args.rank) port = self.ctx.node.get_free_port() # compatible endpoints = [ f"{self.ctx.node.ip}:{p}" for p in self.ctx.node.get_free_ports(self.pod.replicas) ] data = json.dumps( { 'name': self.pod.name, 'rank': self.pod.rank, 'replicas': self.pod.replicas, 'dtype': self.ctx.node.device.dtype, 'candidate': f'{self.ctx.node.ip}:{port}', 'endpoints': ",".join(endpoints), } ) peer_list, rank = self.master.sync_peers( f'/{self.job.id}/info', self.pod.name, data, self.job.replicas, self.pod.rank, ) self.pod.rank = rank if len(peer_list) < 1: return False peer_list = [json.loads(i) for i in peer_list] self.ctx.logger.debug(f"sync peers done {peer_list}") self.save_pod_log(peer_list) global_size = sum([i['replicas'] for i in peer_list]) rank_offset = sum([i['replicas'] for i in peer_list[:rank]]) ''' The new designed collective need nothing but a master endpoint ''' collective_master = peer_list[0]['candidate'] job_endpoints = [i['endpoints'] for i in peer_list] self.pod.reset() selected_dev_key = self.ctx.node.device.get_selected_device_key() selected_dev_list = self.ctx.node.device.get_selected_devices( self.ctx.args.devices ) for i in range(self.pod.replicas): e = { "PADDLE_MASTER": collective_master, "PADDLE_GLOBAL_SIZE": f"{global_size}", "PADDLE_LOCAL_SIZE": f"{self.pod.replicas}", "PADDLE_GLOBAL_RANK": f"{i + rank_offset}", "PADDLE_LOCAL_RANK": f"{i}", "PADDLE_NNODES": f"{self.job.replicas}", # compatible env "PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints), "PADDLE_CURRENT_ENDPOINT": endpoints[i], "PADDLE_TRAINER_ID": f"{i + rank_offset}", "PADDLE_TRAINERS_NUM": f"{global_size}", "PADDLE_RANK_IN_NODE": str(i), } if len(selected_dev_list) > 0: if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE: e.update(self.ctx.node.device.get_custom_device_envs()) if self.pod.replicas == 1: e.update({selected_dev_key: ",".join(selected_dev_list)}) else: e.update({selected_dev_key: selected_dev_list[i]}) else: e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) # log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name, i) log_file = f"workerlog.{i}" self.add_container(envs=e, log_file=log_file) return True class CollectiveElasticController(CollectiveController): @classmethod def enable(cls, ctx): if ctx.args.master and ctx.args.master.startswith("etcd://"): ctx.logger.debug(f"{cls.__name__} enabled") ctx.args.run_mode = ControleMode.COLLECTIVE return True else: return False def register(self): if self.job.id == 'default': self.ctx.logger.warning( 'Using default job name may cause conflict, add --job_id in args' ) self.master.register_heartbeat(self.job.id, self.pod.name) def run(self): timeout = int(self.ctx.args.elastic_timeout) timeout = timeout if self.job.elastic else timeout * 10 self.register() while self.pod.restart <= self.ctx.args.max_restart: self.build_job() self.ctx.logger.info("Waiting peer ready...") ok, replicas = self.master.wait_peer_ready( self.job.replicas_min, self.job.replicas_max, timeout ) if ok: self.job.replicas = replicas else: self.ctx.logger.warning(f"peer not ready {self.job}") break self.ctx.logger.debug(f"Run {self.job}") if not self.build_pod(): continue self.master.set_status(self.ctx.status.RUNNING) self.deploy_pod() if self.watch(): break self.ctx.logger.debug(f"Job done {self.job}")