提交 001c4dba 编写于 作者: F fuyw 提交者: Bo Zhou

Add total CPU nums to monitor (#140)

* Add total CPU nums to monitor

* fix unittest keyerro

* yapf

* test_max_memory error, add more sleep time

* Add more sleep time in job._reply_client_heartbeat

* Try sleep 6 times to check actor number.
上级 a8846355
......@@ -180,7 +180,7 @@ class Job(object):
logger.error(
"Memory used by this job exceeds {}. This job will exist."
.format(self.max_memory))
time.sleep(3)
time.sleep(5)
socket.close(0)
os._exit(1)
except zmq.error.Again as e:
......
......@@ -59,6 +59,9 @@ class ClusterMonitor(object):
status = pickle.loads(msg[1])
data = {'workers': [], 'clients': []}
total_vacant_cpus = 0
total_used_cpus = 0
master_idx = None
for idx, worker in enumerate(status['workers'].values()):
worker['load_time'] = list(worker['load_time'])
......@@ -66,10 +69,17 @@ class ClusterMonitor(object):
if worker['hostname'] == 'Master':
master_idx = idx
data['workers'].append(worker)
total_used_cpus += worker[
'used_cpus'] if 'used_cpus' in worker else 0
total_vacant_cpus += worker[
'vacant_cpus'] if 'vacant_cpus' in worker else 0
if master_idx != 0 and master_idx is not None:
master_worker = data['workers'].pop(master_idx)
data['workers'] = [master_worker] + data['workers']
data['total_vacant_cpus'] = total_vacant_cpus
data['total_cpus'] = total_used_cpus + total_vacant_cpus
data['clients'] = list(status['clients'].values())
self.data = data
time.sleep(10)
......
......@@ -9,7 +9,7 @@ function createDivs(res, divs) {
var workerDiv = document.createElement("div");
workerDiv.id = `w${i}`;
if (i === 0) {
workerDiv.innerHTML = `<p class="card-header" id="${i}">Master</p>`;
workerDiv.innerHTML = `<div class="card-header" id="${i}"><div style="display:inline;">Master</div><div style="float:right; display:inlene;" id="cpu">[CPU] ${res.total_vacant_cpus}/${res.total_cpus}</div></div>`;
} else {
workerDiv.innerHTML = `<p class="card-header" id="${i}">Worker ${res.workers[i].hostname}</p>`;
}
......@@ -138,6 +138,9 @@ function addPlots(res, record, imgHandle, begin, end) {
]
};
var cpuNum = document.getElementById('cpu');
cpu.innerText = `[CPU] ${res.total_vacant_cpus}/${res.total_cpus}`
if (i < record_num && worker.hostname === record[i].hostname) {
if (worker.used_cpus !== record[i].used_cpus) {
imgHandle[`w${i}c0`].setOption(cpuOption);
......@@ -147,8 +150,10 @@ function addPlots(res, record, imgHandle, begin, end) {
}
imgHandle[`w${i}c2`].setOption(loadOption);
} else {
var workerTitle = document.getElementById(`${i}`);
workerTitle.innerText = i===0 ? "Master" : `Worker ${worker.hostname}`;
if (i > 0){
var workerTitle = document.getElementById(`${i}`);
workerTitle.innerText = `Worker ${worker.hostname}`
}
imgHandle[`w${i}c0`].setOption(cpuOption);
imgHandle[`w${i}c1`].setOption(memoryOption);
imgHandle[`w${i}c2`].setOption(loadOption);
......
......@@ -26,7 +26,7 @@ from parl.remote.monitor import ClusterMonitor
from multiprocessing import Process
@parl.remote_class(max_memory=300)
@parl.remote_class(max_memory=350)
class Actor(object):
def __init__(self, x=10):
self.x = x
......@@ -64,10 +64,18 @@ class TestMaxMemory(unittest.TestCase):
time.sleep(20)
self.assertEqual(1, cluster_monitor.data['clients'][0]['actor_num'])
del actor
time.sleep(5)
time.sleep(10)
p = Process(target=self.actor)
p.start()
time.sleep(30)
for _ in range(6):
x = cluster_monitor.data['clients'][0]['actor_num']
if x == 0:
break
else:
time.sleep(10)
if x == 1:
raise ValueError("Actor max memory test failed.")
self.assertEqual(0, cluster_monitor.data['clients'][0]['actor_num'])
p.terminate()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册