未验证 提交 c91aaced 编写于 作者: K kuizhiqing 提交者: GitHub

[LAUNCH] make launch Compatible (#44881)

* make launch compatible

* fix ut

* fix log offset
上级 85df6d73
...@@ -101,4 +101,7 @@ class Context(object): ...@@ -101,4 +101,7 @@ class Context(object):
def set_env_in_args(self): def set_env_in_args(self):
for k, v in env_args_mapping.items(): for k, v in env_args_mapping.items():
if k in self.envs: if k in self.envs:
print(
f"LAUNCH WARNNING args {v} is override by env {self.envs[k]}"
)
setattr(self.args, v, self.envs[k]) setattr(self.args, v, self.envs[k])
...@@ -96,9 +96,12 @@ def parse_args(): ...@@ -96,9 +96,12 @@ def parse_args():
help="unique id of the job. Default default") help="unique id of the job. Default default")
base_group.add_argument("--devices", base_group.add_argument("--devices",
"--gpus",
"--npus",
"--xpus",
type=str, type=str,
default=None, default=None,
help="accelerate devices. as --gpus,npus,xps") help="accelerate devices. as --gpus,npus,xpus")
base_group.add_argument("--host", type=str, default=None, help="host ip") base_group.add_argument("--host", type=str, default=None, help="host ip")
......
...@@ -105,10 +105,9 @@ class Device(object): ...@@ -105,10 +105,9 @@ class Device(object):
os.getenv('PADDLE_XCCL_BACKEND').upper()) os.getenv('PADDLE_XCCL_BACKEND').upper())
if visible_devices_str in os.environ: if visible_devices_str in os.environ:
visible_devices = os.getenv(visible_devices_str) visible_devices = os.getenv(visible_devices_str)
elif 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ: elif 'CUDA_VISIBLE_DEVICES' in os.environ:
dev._dtype = DeviceType.GPU dev._dtype = DeviceType.GPU
visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
"NVIDIA_VISIBLE_DEVICES")
elif 'XPU_VISIBLE_DEVICES' in os.environ: elif 'XPU_VISIBLE_DEVICES' in os.environ:
dev._dtype = DeviceType.XPU dev._dtype = DeviceType.XPU
visible_devices = os.getenv("XPU_VISIBLE_DEVICES") visible_devices = os.getenv("XPU_VISIBLE_DEVICES")
...@@ -151,8 +150,7 @@ class Device(object): ...@@ -151,8 +150,7 @@ class Device(object):
elif fluid.core.is_compiled_with_cuda(): elif fluid.core.is_compiled_with_cuda():
dev._dtype = DeviceType.GPU dev._dtype = DeviceType.GPU
num = fluid.core.get_cuda_device_count() num = fluid.core.get_cuda_device_count()
visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
"NVIDIA_VISIBLE_DEVICES")
elif fluid.core.is_compiled_with_xpu(): elif fluid.core.is_compiled_with_xpu():
dev._dtype = DeviceType.XPU dev._dtype = DeviceType.XPU
num = fluid.core.get_xpu_device_count() num = fluid.core.get_xpu_device_count()
......
...@@ -108,7 +108,9 @@ class CollectiveController(Controller): ...@@ -108,7 +108,9 @@ class CollectiveController(Controller):
else: else:
e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) e.update({'PADDLE_DISTRI_BACKEND': 'gloo'})
self.add_container(envs=e, log_tag=i) # log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name, i)
log_file = f"workerlog.{i}"
self.add_container(envs=e, log_file=log_file)
return True return True
......
...@@ -92,6 +92,9 @@ class ControllerBase(object): ...@@ -92,6 +92,9 @@ class ControllerBase(object):
self.master.set_status(status) self.master.set_status(status)
while self.pod.logs():
pass
self.ctx.logger.info("Pod {}".format(status)) self.ctx.logger.info("Pod {}".format(status))
return True return True
...@@ -105,6 +108,9 @@ class ControllerBase(object): ...@@ -105,6 +108,9 @@ class ControllerBase(object):
fc = self.pod.failed_container() fc = self.pod.failed_container()
self.ctx.logger.info("Pod {}".format(status)) self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.error("Container failed !!!\n{}".format(fc[0])) self.ctx.logger.error("Container failed !!!\n{}".format(fc[0]))
self.ctx.logger.info(
"------------------------- ERROR LOG DETAIL -------------------------"
)
fc[0].tail() fc[0].tail()
if self.ctx.args.elastic_level <= 0: if self.ctx.args.elastic_level <= 0:
...@@ -203,13 +209,8 @@ class Controller(ControllerBase): ...@@ -203,13 +209,8 @@ class Controller(ControllerBase):
container=None, container=None,
entrypoint=None, entrypoint=None,
envs={}, envs={},
log_tag=None, log_file=None,
is_init=False): is_init=False):
if not is_init and log_tag is not None:
log_file = "{}.{}.{}.log".format(self.job.id, self.pod.name,
log_tag)
else:
log_file = None
if not container: if not container:
container = self.new_container(entrypoint=entrypoint, container = self.new_container(entrypoint=entrypoint,
......
...@@ -84,8 +84,8 @@ class PSController(Controller): ...@@ -84,8 +84,8 @@ class PSController(Controller):
"POD_IP": self.ctx.node.ip, "POD_IP": self.ctx.node.ip,
} }
e.update(_gloo_envs) e.update(_gloo_envs)
log_tag = "ps.{}".format(i) log_file = "serverlog.{}".format(i)
self.add_container(envs=e, log_tag=log_tag) self.add_container(envs=e, log_file=log_file)
trainer_rank_offset = 0 trainer_rank_offset = 0
for s in trainer_endpoints: for s in trainer_endpoints:
...@@ -106,8 +106,8 @@ class PSController(Controller): ...@@ -106,8 +106,8 @@ class PSController(Controller):
"POD_IP": self.ctx.node.ip, "POD_IP": self.ctx.node.ip,
} }
e.update(_gloo_envs) e.update(_gloo_envs)
log_tag = "trainer.{}".format(i) log_file = "workerlog.{}".format(i)
self.add_container(envs=e, log_tag=log_tag) self.add_container(envs=e, log_file=log_file)
def _build_pod_with_master(self): def _build_pod_with_master(self):
...@@ -191,8 +191,8 @@ class PSController(Controller): ...@@ -191,8 +191,8 @@ class PSController(Controller):
self.ctx.node.ip, self.ctx.node.ip,
} }
e.update(_gloo_envs) e.update(_gloo_envs)
log_tag = "ps.{}".format(i) log_file = "serverlog.{}".format(i)
self.add_container(envs=e, log_tag=log_tag) self.add_container(envs=e, log_file=log_file)
for i in range(trainer_num): for i in range(trainer_num):
e = { e = {
...@@ -216,8 +216,8 @@ class PSController(Controller): ...@@ -216,8 +216,8 @@ class PSController(Controller):
self.ctx.node.ip, self.ctx.node.ip,
} }
e.update(_gloo_envs) e.update(_gloo_envs)
log_tag = "trainer.{}".format(i) log_file = "workerlog.{}".format(i)
self.add_container(envs=e, log_tag=log_tag) self.add_container(envs=e, log_file=log_file)
''' NEW VERSION ''' NEW VERSION
for i in range(server_num): for i in range(server_num):
e = { e = {
......
...@@ -99,7 +99,7 @@ class Container(object): ...@@ -99,7 +99,7 @@ class Container(object):
d = os.path.dirname(pth) d = os.path.dirname(pth)
if not os.path.isdir(d): if not os.path.isdir(d):
os.makedirs(d, exist_ok=True) os.makedirs(d, exist_ok=True)
return open(pth, 'w') return open(pth, 'a')
except: except:
return None return None
...@@ -115,11 +115,17 @@ class Container(object): ...@@ -115,11 +115,17 @@ class Container(object):
elif self._err: elif self._err:
self._stderr = self._get_fd(self._err) or sys.stderr self._stderr = self._get_fd(self._err) or sys.stderr
if not self._log_handler:
self._log_handler = open(self._out)
self._log_handler.seek(0, 2)
self._log_start_offset = self._log_handler.tell()
self._proc = ProcessContext(self._entrypoint, self._proc = ProcessContext(self._entrypoint,
env=self._env, env=self._env,
out=self._stdout, out=self._stdout,
err=self._stderr, err=self._stderr,
shell=self._shell) shell=self._shell)
self._proc.start() self._proc.start()
def terminate(self, force=False): def terminate(self, force=False):
...@@ -171,13 +177,16 @@ class Container(object): ...@@ -171,13 +177,16 @@ class Container(object):
try: try:
if offset != 0 or whence != 1: if offset != 0 or whence != 1:
if whence == 0 and offset < self._log_start_offset:
offset = self._log_start_offset
self._log_handler.seek(offset, whence) self._log_handler.seek(offset, whence)
for _ in range(limit): for _ in range(limit):
line = self._log_handler.readline() line = self._log_handler.readline()
if not line: if not line:
break return False
fn.write(line) fn.write(line)
return True
except: except:
return return
......
...@@ -52,8 +52,8 @@ def write_file(name, ct): ...@@ -52,8 +52,8 @@ def write_file(name, ct):
def get_files(pth, prefix): def get_files(pth, prefix):
return [ return [
f for f in listdir(pth) if isfile(join(pth, f)) and f.startswith(prefix) f for f in listdir(pth)
and f != f"{prefix}.gpu.log" if isfile(join(pth, f)) and not f.endswith('gpu.log')
] ]
...@@ -101,17 +101,19 @@ class Collective_Test(unittest.TestCase): ...@@ -101,17 +101,19 @@ class Collective_Test(unittest.TestCase):
def test_collective_3(self): def test_collective_3(self):
log_dir = tempfile.TemporaryDirectory() log_dir = tempfile.TemporaryDirectory()
port = random.randrange(6000, 8000) port = random.randrange(6000, 8000)
args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2".format( args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2"
log_dir.name, port) p1 = self.pdrun(args.format(log_dir.name + "/1", port))
p1 = self.pdrun(args) p2 = self.pdrun(args.format(log_dir.name + "/2", port))
p2 = self.pdrun(args)
p1.wait() p1.wait()
p2.wait() p2.wait()
self.assertTrue(p1.poll() == 0) self.assertTrue(p1.poll() == 0)
self.assertTrue(p2.poll() == 0) self.assertTrue(p2.poll() == 0)
c = get_files(log_dir.name, 'test3') c1 = get_files(log_dir.name + "/1", 'test3')
self.assertTrue(len(c) == 6) c2 = get_files(log_dir.name + "/2", 'test3')
print(c1)
self.assertTrue(len(c1) == 3)
self.assertTrue(len(c2) == 3)
log_dir.cleanup() log_dir.cleanup()
...@@ -156,17 +158,19 @@ class PS_Test(unittest.TestCase): ...@@ -156,17 +158,19 @@ class PS_Test(unittest.TestCase):
def test_ps_3(self): def test_ps_3(self):
log_dir = tempfile.TemporaryDirectory() log_dir = tempfile.TemporaryDirectory()
port = random.randrange(6000, 8000) port = random.randrange(6000, 8000)
args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1".format( args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1"
log_dir.name, port) p1 = self.pdrun(args.format(log_dir.name + "/1", port))
p1 = self.pdrun(args) p2 = self.pdrun(args.format(log_dir.name + "/2", port))
p2 = self.pdrun(args)
p1.wait() p1.wait()
p2.wait() p2.wait()
self.assertTrue(p1.poll() == 0) self.assertTrue(p1.poll() == 0)
self.assertTrue(p2.poll() == 0) self.assertTrue(p2.poll() == 0)
c = get_files(log_dir.name, 'ps3') c1 = get_files(log_dir.name + "/1", 'ps3')
self.assertTrue(len(c) == 6) c2 = get_files(log_dir.name + "/2", 'ps3')
print(c1)
self.assertTrue(len(c1) == 3)
self.assertTrue(len(c2) == 3)
log_dir.cleanup() log_dir.cleanup()
def test_ps_4(self): def test_ps_4(self):
...@@ -178,6 +182,7 @@ class PS_Test(unittest.TestCase): ...@@ -178,6 +182,7 @@ class PS_Test(unittest.TestCase):
self.assertTrue(p1.poll() == 0) self.assertTrue(p1.poll() == 0)
c = get_files(log_dir.name, 'ps4') c = get_files(log_dir.name, 'ps4')
print(c)
self.assertTrue(len(c) == 5) self.assertTrue(len(c) == 5)
log_dir.cleanup() log_dir.cleanup()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册