未验证 提交 4c312dab 编写于 作者: B Bo Zhou 提交者: GitHub

fix the thread-safe problem of parl.client (#141)

* fix the thread-safe problem of  parl.client

* yapf
上级 001c4dba
...@@ -202,14 +202,18 @@ class Client(object): ...@@ -202,14 +202,18 @@ class Client(object):
logger.error( logger.error(
'Job {} exceeds max memory usage, will stop this job.'. 'Job {} exceeds max memory usage, will stop this job.'.
format(job_address)) format(job_address))
self.lock.acquire()
self.actor_num -= 1 self.actor_num -= 1
self.lock.release()
job_is_alive = False job_is_alive = False
else: else:
time.sleep(remote_constants.HEARTBEAT_INTERVAL_S) time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
except zmq.error.Again as e: except zmq.error.Again as e:
job_is_alive = False job_is_alive = False
self.lock.acquire()
self.actor_num -= 1 self.actor_num -= 1
self.lock.release()
except zmq.error.ZMQError as e: except zmq.error.ZMQError as e:
break break
...@@ -248,7 +252,9 @@ class Client(object): ...@@ -248,7 +252,9 @@ class Client(object):
check_result = self._check_and_monitor_job( check_result = self._check_and_monitor_job(
job_heartbeat_address, ping_heartbeat_address) job_heartbeat_address, ping_heartbeat_address)
if check_result: if check_result:
self.lock.acquire()
self.actor_num += 1 self.actor_num += 1
self.lock.release()
return job_address return job_address
# no vacant CPU resources, cannot submit a new job # no vacant CPU resources, cannot submit a new job
......
...@@ -79,15 +79,29 @@ class TestClusterMonitor(unittest.TestCase): ...@@ -79,15 +79,29 @@ class TestClusterMonitor(unittest.TestCase):
time.sleep(1) time.sleep(1)
self.assertEqual(20, len(cluster_monitor.data['workers'])) self.assertEqual(20, len(cluster_monitor.data['workers']))
# check if the number of workers drops by 10
for i in range(10): for i in range(10):
workers[i].exit() workers[i].exit()
time.sleep(60)
self.assertEqual(10, len(cluster_monitor.data['workers'])) check_flag = False
for _ in range(10):
if 10 == len(cluster_monitor.data['workers']):
check_flag = True
break
time.sleep(10)
self.assertTrue(check_flag)
for i in range(10, 20): for i in range(10, 20):
workers[i].exit() workers[i].exit()
time.sleep(60)
self.assertEqual(0, len(cluster_monitor.data['workers'])) # check if the number of workers drops to 0
check_flag = False
for _ in range(10):
if 0 == len(cluster_monitor.data['workers']):
check_flag = True
break
time.sleep(10)
self.assertTrue(check_flag)
master.exit() master.exit()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册