From c91aaced74aa1a34c8bde2e53b3072baf8012e73 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 8 Aug 2022 21:09:42 +0800 Subject: [PATCH] [LAUNCH] make launch Compatible (#44881) * make launch compatible * fix ut * fix log offset --- .../distributed/launch/context/__init__.py | 3 ++ .../distributed/launch/context/args_envs.py | 5 ++- .../distributed/launch/context/device.py | 8 ++--- .../launch/controllers/collective.py | 4 ++- .../launch/controllers/controller.py | 13 ++++---- .../distributed/launch/controllers/ps.py | 16 ++++----- .../distributed/launch/job/container.py | 13 ++++++-- .../paddle/fluid/tests/unittests/test_run.py | 33 +++++++++++-------- 8 files changed, 58 insertions(+), 37 deletions(-) diff --git a/python/paddle/distributed/launch/context/__init__.py b/python/paddle/distributed/launch/context/__init__.py index d273d2355b3..921f653b48a 100644 --- a/python/paddle/distributed/launch/context/__init__.py +++ b/python/paddle/distributed/launch/context/__init__.py @@ -101,4 +101,7 @@ class Context(object): def set_env_in_args(self): for k, v in env_args_mapping.items(): if k in self.envs: + print( + f"LAUNCH WARNNING args {v} is override by env {self.envs[k]}" + ) setattr(self.args, v, self.envs[k]) diff --git a/python/paddle/distributed/launch/context/args_envs.py b/python/paddle/distributed/launch/context/args_envs.py index f6624e88e27..b44065c6700 100644 --- a/python/paddle/distributed/launch/context/args_envs.py +++ b/python/paddle/distributed/launch/context/args_envs.py @@ -96,9 +96,12 @@ def parse_args(): help="unique id of the job. Default default") base_group.add_argument("--devices", + "--gpus", + "--npus", + "--xpus", type=str, default=None, - help="accelerate devices. as --gpus,npus,xps") + help="accelerate devices. as --gpus,npus,xpus") base_group.add_argument("--host", type=str, default=None, help="host ip") diff --git a/python/paddle/distributed/launch/context/device.py b/python/paddle/distributed/launch/context/device.py index c48ef04cd09..14997df2459 100644 --- a/python/paddle/distributed/launch/context/device.py +++ b/python/paddle/distributed/launch/context/device.py @@ -105,10 +105,9 @@ class Device(object): os.getenv('PADDLE_XCCL_BACKEND').upper()) if visible_devices_str in os.environ: visible_devices = os.getenv(visible_devices_str) - elif 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ: + elif 'CUDA_VISIBLE_DEVICES' in os.environ: dev._dtype = DeviceType.GPU - visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( - "NVIDIA_VISIBLE_DEVICES") + visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") elif 'XPU_VISIBLE_DEVICES' in os.environ: dev._dtype = DeviceType.XPU visible_devices = os.getenv("XPU_VISIBLE_DEVICES") @@ -151,8 +150,7 @@ class Device(object): elif fluid.core.is_compiled_with_cuda(): dev._dtype = DeviceType.GPU num = fluid.core.get_cuda_device_count() - visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( - "NVIDIA_VISIBLE_DEVICES") + visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") elif fluid.core.is_compiled_with_xpu(): dev._dtype = DeviceType.XPU num = fluid.core.get_xpu_device_count() diff --git a/python/paddle/distributed/launch/controllers/collective.py b/python/paddle/distributed/launch/controllers/collective.py index 8ec21f72ea4..873cfe09ac8 100644 --- a/python/paddle/distributed/launch/controllers/collective.py +++ b/python/paddle/distributed/launch/controllers/collective.py @@ -108,7 +108,9 @@ class CollectiveController(Controller): else: e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) - self.add_container(envs=e, log_tag=i) + # log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name, i) + log_file = f"workerlog.{i}" + self.add_container(envs=e, log_file=log_file) return True diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index 0f0513f0a3d..56499cb6471 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -92,6 +92,9 @@ class ControllerBase(object): self.master.set_status(status) + while self.pod.logs(): + pass + self.ctx.logger.info("Pod {}".format(status)) return True @@ -105,6 +108,9 @@ class ControllerBase(object): fc = self.pod.failed_container() self.ctx.logger.info("Pod {}".format(status)) self.ctx.logger.error("Container failed !!!\n{}".format(fc[0])) + self.ctx.logger.info( + "------------------------- ERROR LOG DETAIL -------------------------" + ) fc[0].tail() if self.ctx.args.elastic_level <= 0: @@ -203,13 +209,8 @@ class Controller(ControllerBase): container=None, entrypoint=None, envs={}, - log_tag=None, + log_file=None, is_init=False): - if not is_init and log_tag is not None: - log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name, - log_tag) - else: - log_file = None if not container: container = self.new_container(entrypoint=entrypoint, diff --git a/python/paddle/distributed/launch/controllers/ps.py b/python/paddle/distributed/launch/controllers/ps.py index 19429ce1961..573f578d249 100644 --- a/python/paddle/distributed/launch/controllers/ps.py +++ b/python/paddle/distributed/launch/controllers/ps.py @@ -84,8 +84,8 @@ class PSController(Controller): "POD_IP": self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "ps.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "serverlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) trainer_rank_offset = 0 for s in trainer_endpoints: @@ -106,8 +106,8 @@ class PSController(Controller): "POD_IP": self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "trainer.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "workerlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) def _build_pod_with_master(self): @@ -191,8 +191,8 @@ class PSController(Controller): self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "ps.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "serverlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) for i in range(trainer_num): e = { @@ -216,8 +216,8 @@ class PSController(Controller): self.ctx.node.ip, } e.update(_gloo_envs) - log_tag = "trainer.{}".format(i) - self.add_container(envs=e, log_tag=log_tag) + log_file = "workerlog.{}".format(i) + self.add_container(envs=e, log_file=log_file) ''' NEW VERSION for i in range(server_num): e = { diff --git a/python/paddle/distributed/launch/job/container.py b/python/paddle/distributed/launch/job/container.py index e0f580da0ac..8da5363915c 100644 --- a/python/paddle/distributed/launch/job/container.py +++ b/python/paddle/distributed/launch/job/container.py @@ -99,7 +99,7 @@ class Container(object): d = os.path.dirname(pth) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) - return open(pth, 'w') + return open(pth, 'a') except: return None @@ -115,11 +115,17 @@ class Container(object): elif self._err: self._stderr = self._get_fd(self._err) or sys.stderr + if not self._log_handler: + self._log_handler = open(self._out) + self._log_handler.seek(0, 2) + self._log_start_offset = self._log_handler.tell() + self._proc = ProcessContext(self._entrypoint, env=self._env, out=self._stdout, err=self._stderr, shell=self._shell) + self._proc.start() def terminate(self, force=False): @@ -171,13 +177,16 @@ class Container(object): try: if offset != 0 or whence != 1: + if whence == 0 and offset < self._log_start_offset: + offset = self._log_start_offset self._log_handler.seek(offset, whence) for _ in range(limit): line = self._log_handler.readline() if not line: - break + return False fn.write(line) + return True except: return diff --git a/python/paddle/fluid/tests/unittests/test_run.py b/python/paddle/fluid/tests/unittests/test_run.py index 75b8b169cb2..2a9f69bff1e 100644 --- a/python/paddle/fluid/tests/unittests/test_run.py +++ b/python/paddle/fluid/tests/unittests/test_run.py @@ -52,8 +52,8 @@ def write_file(name, ct): def get_files(pth, prefix): return [ - f for f in listdir(pth) if isfile(join(pth, f)) and f.startswith(prefix) - and f != f"{prefix}.gpu.log" + f for f in listdir(pth) + if isfile(join(pth, f)) and not f.endswith('gpu.log') ] @@ -101,17 +101,19 @@ class Collective_Test(unittest.TestCase): def test_collective_3(self): log_dir = tempfile.TemporaryDirectory() port = random.randrange(6000, 8000) - args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2".format( - log_dir.name, port) - p1 = self.pdrun(args) - p2 = self.pdrun(args) + args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2" + p1 = self.pdrun(args.format(log_dir.name + "/1", port)) + p2 = self.pdrun(args.format(log_dir.name + "/2", port)) p1.wait() p2.wait() self.assertTrue(p1.poll() == 0) self.assertTrue(p2.poll() == 0) - c = get_files(log_dir.name, 'test3') - self.assertTrue(len(c) == 6) + c1 = get_files(log_dir.name + "/1", 'test3') + c2 = get_files(log_dir.name + "/2", 'test3') + print(c1) + self.assertTrue(len(c1) == 3) + self.assertTrue(len(c2) == 3) log_dir.cleanup() @@ -156,17 +158,19 @@ class PS_Test(unittest.TestCase): def test_ps_3(self): log_dir = tempfile.TemporaryDirectory() port = random.randrange(6000, 8000) - args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1".format( - log_dir.name, port) - p1 = self.pdrun(args) - p2 = self.pdrun(args) + args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1" + p1 = self.pdrun(args.format(log_dir.name + "/1", port)) + p2 = self.pdrun(args.format(log_dir.name + "/2", port)) p1.wait() p2.wait() self.assertTrue(p1.poll() == 0) self.assertTrue(p2.poll() == 0) - c = get_files(log_dir.name, 'ps3') - self.assertTrue(len(c) == 6) + c1 = get_files(log_dir.name + "/1", 'ps3') + c2 = get_files(log_dir.name + "/2", 'ps3') + print(c1) + self.assertTrue(len(c1) == 3) + self.assertTrue(len(c2) == 3) log_dir.cleanup() def test_ps_4(self): @@ -178,6 +182,7 @@ class PS_Test(unittest.TestCase): self.assertTrue(p1.poll() == 0) c = get_files(log_dir.name, 'ps4') + print(c) self.assertTrue(len(c) == 5) log_dir.cleanup() -- GitLab