diff --git a/python/paddle/distributed/launch/context/__init__.py b/python/paddle/distributed/launch/context/__init__.py index d273d2355b38c464d5d3e89108f6b453da674d6f..921f653b48a6da088070ef121bad5342f0664eeb 100644 --- a/python/paddle/distributed/launch/context/__init__.py +++ b/python/paddle/distributed/launch/context/__init__.py @@ -101,4 +101,7 @@ class Context(object): def set_env_in_args(self): for k, v in env_args_mapping.items(): if k in self.envs: + print( + f"LAUNCH WARNNING args {v} is override by env {self.envs[k]}" + ) setattr(self.args, v, self.envs[k]) diff --git a/python/paddle/distributed/launch/context/args_envs.py b/python/paddle/distributed/launch/context/args_envs.py index f6624e88e276db0b2a4881bb7ce76a80d3f96e2d..b44065c670005cb1a9fdbc038345d58cb6f8863d 100644 --- a/python/paddle/distributed/launch/context/args_envs.py +++ b/python/paddle/distributed/launch/context/args_envs.py @@ -96,9 +96,12 @@ def parse_args(): help="unique id of the job. Default default") base_group.add_argument("--devices", + "--gpus", + "--npus", + "--xpus", type=str, default=None, - help="accelerate devices. as --gpus,npus,xps") + help="accelerate devices. as --gpus,npus,xpus") base_group.add_argument("--host", type=str, default=None, help="host ip") diff --git a/python/paddle/distributed/launch/context/device.py b/python/paddle/distributed/launch/context/device.py index c48ef04cd09204b56e9fe015fdd168d4ba81477f..14997df24590f80d920f2eb0f85fcd89558e05b6 100644 --- a/python/paddle/distributed/launch/context/device.py +++ b/python/paddle/distributed/launch/context/device.py @@ -105,10 +105,9 @@ class Device(object): os.getenv('PADDLE_XCCL_BACKEND').upper()) if visible_devices_str in os.environ: visible_devices = os.getenv(visible_devices_str) - elif 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ: + elif 'CUDA_VISIBLE_DEVICES' in os.environ: dev._dtype = DeviceType.GPU - visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( - "NVIDIA_VISIBLE_DEVICES") + visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") elif 'XPU_VISIBLE_DEVICES' in os.environ: dev._dtype = DeviceType.XPU visible_devices = os.getenv("XPU_VISIBLE_DEVICES") @@ -151,8 +150,7 @@ class Device(object): elif fluid.core.is_compiled_with_cuda(): dev._dtype = DeviceType.GPU num = fluid.core.get_cuda_device_count() - visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( - "NVIDIA_VISIBLE_DEVICES") + visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") elif fluid.core.is_compiled_with_xpu(): dev._dtype = DeviceType.XPU num = fluid.core.get_xpu_device_count() diff --git a/python/paddle/distributed/launch/controllers/collective.py b/python/paddle/distributed/launch/controllers/collective.py index 8ec21f72ea4ded64df9b15e647f3f3dd81d4413a..873cfe09ac8b89ed486f2104607ad82675ebbe7b 100644 --- a/python/paddle/distributed/launch/controllers/collective.py +++ b/python/paddle/distributed/launch/controllers/collective.py @@ -108,7 +108,9 @@ class CollectiveController(Controller): else: e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) - self.add_container(envs=e, log_tag=i) + # log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name, i) + log_file = f"workerlog.{i}" + self.add_container(envs=e, log_file=log_file) return True diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index 0f0513f0a3d9b5081a2e038833b32253a035f5b6..56499cb64713454e83f45f90dbe07df54f524d59 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -92,6 +92,9 @@ class ControllerBase(object): self.master.set_status(status) + while self.pod.logs(): + pass + self.ctx.logger.info("Pod {}".format(status)) return True @@ -105,6 +108,9 @@ class ControllerBase(object): fc = self.pod.failed_container() self.ctx.logger.info("Pod {}".format(status)) self.ctx.logger.error("Container failed !!!\n{}".format(fc[0])) + self.ctx.logger.info( + "------------------------- ERROR LOG DETAIL -------------------------" + ) fc[0].tail() if self.ctx.args.elastic_level <= 0: @@ -203,13 +209,8 @@ class Controller(ControllerBase): container=None, entrypoint=None, envs={}, - log_tag=None, + log_file=None, is_init=False): - if not is_init and log_tag is not None: - log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name, - log_tag) - else: - log_file = None if not container: container = self.new_container(entrypoint=entrypoint, diff --git a/python/paddle/distributed/launch/controllers/ps.py b/python/paddle/distributed/launch/controllers/ps.py index 19429ce19614ee44a6cca938841532fb1b8df6a8..573f578d249e133b3a3f39444c93c273cb695844 100644 --- a/python/paddle/distributed/launch/controllers/ps.py +++ b/python/paddle/distributed/launch/controllers/ps.py @@ -84,8 +84,8 @@ class PSController(Controller): "POD_IP": self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "ps.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "serverlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) trainer_rank_offset = 0 for s in trainer_endpoints: @@ -106,8 +106,8 @@ class PSController(Controller): "POD_IP": self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "trainer.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "workerlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) def _build_pod_with_master(self): @@ -191,8 +191,8 @@ class PSController(Controller): self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "ps.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "serverlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) for i in range(trainer_num): e = { @@ -216,8 +216,8 @@ class PSController(Controller): self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "trainer.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "workerlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) ''' NEW VERSION for i in range(server_num): e = { diff --git a/python/paddle/distributed/launch/job/container.py b/python/paddle/distributed/launch/job/container.py index e0f580da0ac45d125fa8355029dba601f7519436..8da5363915ced6ac8ece264af919112e2e042f53 100644 --- a/python/paddle/distributed/launch/job/container.py +++ b/python/paddle/distributed/launch/job/container.py @@ -99,7 +99,7 @@ class Container(object): d = os.path.dirname(pth) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) - return open(pth, 'w') + return open(pth, 'a') except: return None @@ -115,11 +115,17 @@ class Container(object): elif self._err: self._stderr = self._get_fd(self._err) or sys.stderr + if not self._log_handler: + self._log_handler = open(self._out) + self._log_handler.seek(0, 2) + self._log_start_offset = self._log_handler.tell() + self._proc = ProcessContext(self._entrypoint, env=self._env, out=self._stdout, err=self._stderr, shell=self._shell) + self._proc.start() def terminate(self, force=False): @@ -171,13 +177,16 @@ class Container(object): try: if offset != 0 or whence != 1: + if whence == 0 and offset < self._log_start_offset: + offset = self._log_start_offset self._log_handler.seek(offset, whence) for _ in range(limit): line = self._log_handler.readline() if not line: - break + return False fn.write(line) + return True except: return diff --git a/python/paddle/fluid/tests/unittests/test_run.py b/python/paddle/fluid/tests/unittests/test_run.py index 75b8b169cb2e1ed1e59e37d0cd7d6d330bd5261d..2a9f69bff1eba7f75d22cdf4720f968f7ca27e04 100644 --- a/python/paddle/fluid/tests/unittests/test_run.py +++ b/python/paddle/fluid/tests/unittests/test_run.py @@ -52,8 +52,8 @@ def write_file(name, ct): def get_files(pth, prefix): return [ - f for f in listdir(pth) if isfile(join(pth, f)) and f.startswith(prefix) - and f != f"{prefix}.gpu.log" + f for f in listdir(pth) + if isfile(join(pth, f)) and not f.endswith('gpu.log') ] @@ -101,17 +101,19 @@ class Collective_Test(unittest.TestCase): def test_collective_3(self): log_dir = tempfile.TemporaryDirectory() port = random.randrange(6000, 8000) - args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2".format( - log_dir.name, port) - p1 = self.pdrun(args) - p2 = self.pdrun(args) + args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2" + p1 = self.pdrun(args.format(log_dir.name + "/1", port)) + p2 = self.pdrun(args.format(log_dir.name + "/2", port)) p1.wait() p2.wait() self.assertTrue(p1.poll() == 0) self.assertTrue(p2.poll() == 0) - c = get_files(log_dir.name, 'test3') - self.assertTrue(len(c) == 6) + c1 = get_files(log_dir.name + "/1", 'test3') + c2 = get_files(log_dir.name + "/2", 'test3') + print(c1) + self.assertTrue(len(c1) == 3) + self.assertTrue(len(c2) == 3) log_dir.cleanup() @@ -156,17 +158,19 @@ class PS_Test(unittest.TestCase): def test_ps_3(self): log_dir = tempfile.TemporaryDirectory() port = random.randrange(6000, 8000) - args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1".format( - log_dir.name, port) - p1 = self.pdrun(args) - p2 = self.pdrun(args) + args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1" + p1 = self.pdrun(args.format(log_dir.name + "/1", port)) + p2 = self.pdrun(args.format(log_dir.name + "/2", port)) p1.wait() p2.wait() self.assertTrue(p1.poll() == 0) self.assertTrue(p2.poll() == 0) - c = get_files(log_dir.name, 'ps3') - self.assertTrue(len(c) == 6) + c1 = get_files(log_dir.name + "/1", 'ps3') + c2 = get_files(log_dir.name + "/2", 'ps3') + print(c1) + self.assertTrue(len(c1) == 3) + self.assertTrue(len(c2) == 3) log_dir.cleanup() def test_ps_4(self): @@ -178,6 +182,7 @@ class PS_Test(unittest.TestCase): self.assertTrue(p1.poll() == 0) c = get_files(log_dir.name, 'ps4') + print(c) self.assertTrue(len(c) == 5) log_dir.cleanup()