diff --git a/parl/remote/job.py b/parl/remote/job.py index d4d8133da84caf514dfd69757808faaeb541ec93..4a49d51f79ab1f68f012e3a461ae110b0e12b1fc 100644 --- a/parl/remote/job.py +++ b/parl/remote/job.py @@ -180,7 +180,7 @@ class Job(object): logger.error( "Memory used by this job exceeds {}. This job will exist." .format(self.max_memory)) - time.sleep(3) + time.sleep(5) socket.close(0) os._exit(1) except zmq.error.Again as e: diff --git a/parl/remote/monitor.py b/parl/remote/monitor.py index 5511bdc391ede4de89807b0b9a11a4acbbc10590..8f5c1d5f1d4b4919b4230f27a7a656a1417f6d23 100644 --- a/parl/remote/monitor.py +++ b/parl/remote/monitor.py @@ -59,6 +59,9 @@ class ClusterMonitor(object): status = pickle.loads(msg[1]) data = {'workers': [], 'clients': []} + total_vacant_cpus = 0 + total_used_cpus = 0 + master_idx = None for idx, worker in enumerate(status['workers'].values()): worker['load_time'] = list(worker['load_time']) @@ -66,10 +69,17 @@ class ClusterMonitor(object): if worker['hostname'] == 'Master': master_idx = idx data['workers'].append(worker) + total_used_cpus += worker[ + 'used_cpus'] if 'used_cpus' in worker else 0 + total_vacant_cpus += worker[ + 'vacant_cpus'] if 'vacant_cpus' in worker else 0 + if master_idx != 0 and master_idx is not None: master_worker = data['workers'].pop(master_idx) data['workers'] = [master_worker] + data['workers'] + data['total_vacant_cpus'] = total_vacant_cpus + data['total_cpus'] = total_used_cpus + total_vacant_cpus data['clients'] = list(status['clients'].values()) self.data = data time.sleep(10) diff --git a/parl/remote/static/js/parl.js b/parl/remote/static/js/parl.js index a9d0d0bc1ebda01e84e634927f3b3cdf26bc56e7..117e2d5542e69213a0b4ae7e04d5b6c6533006c8 100644 --- a/parl/remote/static/js/parl.js +++ b/parl/remote/static/js/parl.js @@ -9,7 +9,7 @@ function createDivs(res, divs) { var workerDiv = document.createElement("div"); workerDiv.id = `w${i}`; if (i === 0) { - workerDiv.innerHTML = `

Master

`; + workerDiv.innerHTML = `
Master
[CPU] ${res.total_vacant_cpus}/${res.total_cpus}
`; } else { workerDiv.innerHTML = `

Worker ${res.workers[i].hostname}

`; } @@ -138,6 +138,9 @@ function addPlots(res, record, imgHandle, begin, end) { ] }; + var cpuNum = document.getElementById('cpu'); + cpu.innerText = `[CPU] ${res.total_vacant_cpus}/${res.total_cpus}` + if (i < record_num && worker.hostname === record[i].hostname) { if (worker.used_cpus !== record[i].used_cpus) { imgHandle[`w${i}c0`].setOption(cpuOption); @@ -147,8 +150,10 @@ function addPlots(res, record, imgHandle, begin, end) { } imgHandle[`w${i}c2`].setOption(loadOption); } else { - var workerTitle = document.getElementById(`${i}`); - workerTitle.innerText = i===0 ? "Master" : `Worker ${worker.hostname}`; + if (i > 0){ + var workerTitle = document.getElementById(`${i}`); + workerTitle.innerText = `Worker ${worker.hostname}` + } imgHandle[`w${i}c0`].setOption(cpuOption); imgHandle[`w${i}c1`].setOption(memoryOption); imgHandle[`w${i}c2`].setOption(loadOption); diff --git a/parl/remote/tests/actor_max_memory_test.py b/parl/remote/tests/actor_max_memory_test.py index 8425b40aeaeb731d349372843774c0914e099315..ebe7f35d5c2c3a978bb8c257596797c96503ee35 100644 --- a/parl/remote/tests/actor_max_memory_test.py +++ b/parl/remote/tests/actor_max_memory_test.py @@ -26,7 +26,7 @@ from parl.remote.monitor import ClusterMonitor from multiprocessing import Process -@parl.remote_class(max_memory=300) +@parl.remote_class(max_memory=350) class Actor(object): def __init__(self, x=10): self.x = x @@ -64,10 +64,18 @@ class TestMaxMemory(unittest.TestCase): time.sleep(20) self.assertEqual(1, cluster_monitor.data['clients'][0]['actor_num']) del actor - time.sleep(5) + time.sleep(10) p = Process(target=self.actor) p.start() - time.sleep(30) + + for _ in range(6): + x = cluster_monitor.data['clients'][0]['actor_num'] + if x == 0: + break + else: + time.sleep(10) + if x == 1: + raise ValueError("Actor max memory test failed.") self.assertEqual(0, cluster_monitor.data['clients'][0]['actor_num']) p.terminate()