未验证 提交 063e9b20 编写于 作者: K Kaipeng Deng 提交者: GitHub

fix threads not exit on ctrl+c (#463)

上级 eb475f5c
...@@ -35,6 +35,7 @@ import traceback ...@@ -35,6 +35,7 @@ import traceback
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
main_pid = os.getpid()
worker_set = set() worker_set = set()
...@@ -131,7 +132,8 @@ class ParallelMap(object): ...@@ -131,7 +132,8 @@ class ParallelMap(object):
self._consumers.append(p) self._consumers.append(p)
p.daemon = True p.daemon = True
setattr(p, 'id', consumer_id) setattr(p, 'id', consumer_id)
worker_set.add(p) if use_process:
worker_set.add(p)
self._epoch = -1 self._epoch = -1
self._feeding_ev = Event() self._feeding_ev = Event()
...@@ -288,12 +290,22 @@ class ParallelMap(object): ...@@ -288,12 +290,22 @@ class ParallelMap(object):
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit()) signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit())
# FIXME(dkp): KeyboardInterrupt should be handled inside ParallelMap
# and do such as: 1. exit workers 2. close queues 3. release shared
# memory, HACK KeyboardInterrupt with global signal.SIGINT handler
# here, should be refined later
def _term_workers(sig_num, frame): def _term_workers(sig_num, frame):
global worker_set global worker_set, main_pid
logger.info("main proc {} exit, kill subprocess {}".format( # only do subporcess killing in main process
pid, [w.pid for w in worker_set])) if os.getpid() != main_pid:
return
logger.info("KeyboardInterrupt: main proc {} exit, kill subprocess {}" \
.format(os.getpid(), [w.pid for w in worker_set]))
for w in worker_set: for w in worker_set:
os.kill(w, signal.SIGKILL) if w.pid is not None:
os.kill(w.pid, signal.SIGINT)
sys.exit()
signal.signal(signal.SIGINT, _term_workers) signal.signal(signal.SIGINT, _term_workers)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册