manager.py 22.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import socket
import os
import six
19
import copy
20
import signal
K
kuizhiqing 已提交
21
import random
22 23
import threading
import traceback
24
import subprocess
25 26
from paddle.distributed.fleet import cloud_utils
from paddle.distributed.fleet import launch_utils
27

R
Roc 已提交
28 29 30
from paddle.distributed.utils.log_utils import get_logger

logger = get_logger("INFO", "ELASTIC")
31 32

ELASTIC_EXIT_CODE = 101
33
ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102
34

35 36 37 38 39 40 41 42 43 44 45 46
# wait for timeout, unit: seconds
ELASTIC_TIMEOUT = 2 * 60

# keepalived ttl, unit: seconds
ELASTIC_TTL = 60


# 1: Fault tolerance, 2: Elastic
class ElasticLevel:
    FAULT_TOLERANCE = 1
    ELASTIC = 2

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61

class ElasticStatus:
    COMPLETED = "completed"
    ERROR = "error"
    HOLD = "hold"
    RESTART = "restart"
    EXIT = "exit"


class LauncherInterface(object):
    def __init__(self, args):
        self.args = args
        self.procs = []

    def _terminate_procs(self):
K
kuizhiqing 已提交
62
        # try to terminate process by group, this happend in multiprocess senario in user process
K
kuizhiqing 已提交
63 64 65 66 67 68
        if os.name != 'nt':
            for p in self.procs:
                if p.proc.poll() is None:
                    os.killpg(os.getpgid(p.proc.pid), signal.SIGTERM)
                    if p.log_fn:
                        p.log_fn.close()
69 70 71
                    logger.info(
                        "terminate process group gid:{}".format(p.proc.pid)
                    )
K
kuizhiqing 已提交
72

K
kuizhiqing 已提交
73
            time.sleep(1)
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
        for p in self.procs:
            if p.proc.poll() is None:
                p.proc.terminate()
                if p.log_fn:
                    p.log_fn.close()
                logger.info("terminate process id:{}".format(p.proc.pid))

        for step in range(0, 50):
            alive = False
            for p in self.procs:
                if p.proc.poll() is None:  # not termniate
                    os.kill(p.proc.pid, signal.SIGKILL)
                    alive = True

            if not alive:
K
kuizhiqing 已提交
89
                logger.info("terminated all the procs")
90 91 92 93 94 95 96 97 98 99 100 101 102
                return True

            time.sleep(1)
        return False

    def _check_procs(self):
        alive = False
        result = None
        for p in self.procs:
            ret = p.proc.poll()
            if ret is None:
                alive = True
            elif ret != 0:
103 104 105
                if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE:
                    logger.info("return form elastic auto parallel re-launch")
                    return ret
K
kuizhiqing 已提交
106 107
                logger.error("ABORT!!! ABORT!!! ABORT!!!")
                logger.error(
108 109 110 111
                    "ERROR rank {} error with exit code {}, check log for detail.".format(
                        p.rank, ret
                    )
                )
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
                result = ret
        if not alive and result is None:
            return 0
        else:
            return result

    def launch(self):
        raise NotImplementedError

    def stop(self):
        raise NotImplementedError

    def watch(self):
        raise NotImplementedError


class ElasticManager(object):
129
    def __init__(self, args, etcd_client):
130 131 132 133

        self.args = args
        server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
        name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')
134
        self.min_np, self.max_np = self._parse_np(args.np)
135 136 137 138
        host = args.host or os.getenv('POD_IP')
        scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0))
        force = args.force or os.getenv('PADDLE_ELASTIC_FORCE')

139
        self.host = host if host else self._get_host()
140

141 142 143 144
        (
            self.device_mode,
            self.devices_per_proc,
        ) = launch_utils.get_device_proc_info(args)
145 146

        self.elastic_timeout = int(
147 148
            os.getenv('PADDLE_ELASTIC_TIMEOUT', ELASTIC_TIMEOUT)
        )
149 150
        elastic_ttl = int(os.getenv('PADDLE_ELASTIC_TTL', ELASTIC_TTL))

151 152 153 154 155 156 157 158 159 160 161 162 163 164
        self.start_port = None
        if cloud_utils.use_paddlecloud():
            self.trainers = os.getenv('PADDLE_TRAINERS', '')
            self.np = len(self.trainers.split(","))
            self.start_port = int(os.getenv("PADDLE_PORT", "6170"))
            self.dist_endpoints = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '')
            trainer_endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS', '')
            self.trainer_endpoints_list = trainer_endpoints.split(",")
        else:
            self.trainers = args.ips or os.getenv('PADDLE_TRAINERS', '')
            node_ips = self.trainers.split(",")
            self.np = len(node_ips)
            self.start_port = int(os.getenv("FLAGS_START_PORT", "6170"))
            self.dist_endpoints = self._host_to_endpoints(
165 166
                node_ips, self.devices_per_proc, self.start_port
            )
167 168 169 170 171 172
            self.trainer_endpoints_list = [
                "%s:%d" % (ip, self.start_port) for ip in node_ips
            ]

        self.curr_host = "%s:%d" % (self.host, self.start_port)
        logger.info(f'start job with np={self.np}')
173
        logger.info(
174
            f"trainers={self.trainers}, trainer_endpoints_list={self.trainer_endpoints_list}"
175 176 177 178
        )

        # auto correct the value of elastic_level
        # 1: Fault tolerant, 2: Elastic
179
        self.elastic_level = int(
180 181 182 183 184 185
            os.getenv(
                'PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL',
                ElasticLevel.FAULT_TOLERANCE,
            )
        )
        if self.min_np == self.max_np or (self.min_np > 0 and self.max_np == 0):
186
            self.elastic_level = ElasticLevel.FAULT_TOLERANCE
187
            logger.info('start job with ElasticLevel.FAULT_TOLERANCE')
188 189
        if self.min_np > 0 and self.max_np > self.min_np:
            self.elastic_level = ElasticLevel.ELASTIC
190
            logger.info('start job with ElasticLevel.ELASTIC')
191

K
kuizhiqing 已提交
192
        # compatible with kuberntes service discovery
193 194 195 196 197
        if (
            not server
            and os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST')
            and os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT')
        ):
K
kuizhiqing 已提交
198 199
            server = '{}:{}'.format(
                os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'),
200 201
                os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT'),
            )
K
kuizhiqing 已提交
202

203 204 205 206 207 208
        logger.debug('init with server {} host {}'.format(server, host))

        self.hosts = []
        self.stopped = False

        self.sigint = 0
K
kuizhiqing 已提交
209
        self.need_sync = False
210

211 212 213
        self.elastic_startup_time = None

        if not server or ':' not in server or not name or not self.np:
214
            logger.info(
215 216 217 218
                'Elastic is not enabled with server {} name {} and np {}'.format(
                    server, name, self.np
                )
            )
219 220 221 222 223
            self.enable = False
            return
        else:
            self.enable = True

224
        self.etcd = etcd_client
225 226 227

        # etcd data
        self.prefix = "/paddle/" + name
K
kuizhiqing 已提交
228
        self.node_prefix = self.prefix + '/nodes'
229 230
        self.np_path = self.prefix + '/np'
        self.endpoints_path = self.prefix + '/endpoints'
K
kuizhiqing 已提交
231 232

        node_tag = ''.join(
233 234 235 236 237
            random.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(6)
        )
        self.host_path = '{}/{}{}'.format(
            self.node_prefix, node_tag, time.time()
        )
238 239 240 241 242 243
        '''
        0 group mode, be aware of healthy status of other workers
        1 decouple mode, check own status only
        '''
        self.etcd.put(self.prefix, b'0')

244
        # register callback
245
        def host_call_back(event):
246 247 248 249
            self.hosts = [
                six.ensure_str(i[0])
                for i in self.etcd.get_prefix(self.node_prefix)
            ]
250
            self.hosts = list(set(self.hosts)) if self.hosts else self.hosts
251
            logger.info(
252 253
                f"host_call_back curr_host={self.curr_host}, hosts:{self.hosts}"
            )
254 255 256
            self.need_sync = True
            self.elastic_startup_time = None

257
        host_watch = self.etcd.add_watch_prefix_callback(
258 259
            self.node_prefix, host_call_back
        )
260 261 262 263 264 265 266 267 268 269 270 271
        host_lease = self.etcd.lease(elastic_ttl)

        # register etcd lease heartbeat
        def lease_heartbeat():
            while True:
                try:
                    host_lease.refresh()

                    hosts = [
                        six.ensure_str(i[0])
                        for i in self.etcd.get_prefix(self.node_prefix)
                    ]
272
                    hosts = list(set(hosts)) if hosts else hosts
273
                    logger.info(
274
                        f"[lease_heartbeat] curr_host={self.curr_host}, hosts={hosts}"
275
                    )
276
                    if self.curr_host not in hosts:
277
                        logger.info(
278 279 280 281 282 283 284
                            f"[lease_heartbeat] register host={self.curr_host}"
                        )
                        self.etcd.put(
                            self.host_path,
                            self.curr_host.encode('latin-1'),
                            lease=host_lease,
                        )
285
                except Exception as e:
286 287
                    logger.error(
                        "[lease_heartbeat] internal error:{} {}".format(
288 289 290
                            e, traceback.format_exc()
                        )
                    )
291 292 293
                    break
                time.sleep(elastic_ttl / 3)

294 295 296
        keepalived_thread = threading.Thread(
            name='lease_heartbeat', target=lease_heartbeat, daemon=True
        )
297 298
        keepalived_thread.start()

299 300 301
        self.etcd.put(
            self.host_path, self.curr_host.encode('latin-1'), lease=host_lease
        )
302 303

        # endpoints handle DISTRIBUTED_TRAINER_ENDPOINTS and PADDLE_TRAINERS
304 305
        self.etcd.put(
            self.endpoints_path,
306 307 308 309
            '{}|{}'.format(self.dist_endpoints, self.trainers).encode(
                'latin-1'
            ),
        )
310 311

        def endpoints_call_back(event):
312
            if not self.dist_endpoints:
313 314
                return
            edps = six.ensure_str(self.etcd.get(self.endpoints_path)[0] or '')
315
            self.dist_endpoints, self.trainers = edps.split('|')
316 317 318 319 320
            logger.info(
                "set DISTRIBUTED_TRAINER_ENDPOINTS {} ".format(
                    self.dist_endpoints
                )
            )
321 322
            logger.info("set PADDLE_TRAINERS {} ".format(self.trainers))

323 324 325
        endpoints_watch = self.etcd.add_watch_callback(
            self.endpoints_path, endpoints_call_back
        )
326

327
        self.watches = [host_watch, endpoints_watch]
K
kuizhiqing 已提交
328 329
        self.launcher = None

330 331 332
    def _host_to_endpoints(
        self, ip_port_list: list, devices_per_proc: list, start_port: int = 6170
    ) -> str:
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
        endpoint_list = []
        for ip_port in ip_port_list:
            endpoints = ip_port.split(":")
            if len(endpoints) == 2:
                ip = endpoints[0]
                port = int(endpoints[1])
            else:
                ip = endpoints
                port = start_port

            ports = [x for x in range(port, port + len(devices_per_proc))]
            endpoint_list.extend(["%s:%d" % (ip, port) for port in ports])

        dist_endpoints = ','.join(endpoint_list)
        return dist_endpoints

349 350 351
    def exit(self, completed=False):
        logger.info('manager exist completed {}'.format(completed))

K
kuizhiqing 已提交
352 353
        if self.launcher:
            self.launcher.stop()
K
kuizhiqing 已提交
354

355 356 357 358 359 360 361 362 363 364 365 366 367 368
        if not self.enable:
            return

        if completed:
            self.etcd.put(self.prefix, b'1')

        for watch in self.watches:
            self.etcd.cancel_watch(watch)
        self.etcd.delete(self.host_path)

        hosts = [i for i in self.etcd.get_prefix(self.node_prefix)]
        if len(hosts) == 0:
            self.etcd.delete_prefix(self.prefix)

369 370 371 372
    def pre_hook(self):
        if not self.args.elastic_pre_hook:
            logger.info("skip pre_hook")
            return
373
        logger.info("execute pre_hook...")
374
        current_env = copy.copy(os.environ.copy())
375 376 377 378 379 380 381
        out, err = subprocess.Popen(
            self.args.elastic_pre_hook,
            env=current_env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True,
        ).communicate()
382
        if err:
R
Roc 已提交
383
            logger.warning("pre_hook exec failed")
384 385 386
        else:
            logger.info(f"pre_hook exec result: {out.decode('utf-8').strip()}")

387 388
    def _parse_np(self, np: str):
        """
389
        np format is "MIN" or "MIN:MAX"
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
        """
        np_str = np or os.getenv('PADDLE_ELASTIC_NP', "0")
        np_dict = np_str.split(":")
        min_np = max_np = 0
        if len(np_dict) == 1:
            # Fault tolerant
            min_np = int(np_dict[0])
            min_np = 1 if min_np <= 0 else min_np
            max_np = 1
        elif len(np_dict) == 2:
            # Elastic
            min_np = int(np_dict[0])
            max_np = int(np_dict[1])
            min_np = 1 if min_np <= 0 else min_np
            max_np = min_np if min_np > max_np else max_np
        else:
            raise ValueError(
407 408
                f'the np={np} needs to be in "MIN" or "MIN:MAX" format'
            )
409 410 411

        return min_np, max_np

412 413 414 415 416 417 418 419 420 421 422 423
    def _get_host(self):
        try:
            return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
        except:
            return '127.0.0.1'

    def _completed(self):
        if not self.enable:
            return True

        return int(self.etcd.get(self.prefix)[0]) == 1

424
    def _match(self, host_list: list = None):
425 426
        if host_list:
            self.hosts = host_list
427
        else:
428 429 430 431
            self.hosts = [
                six.ensure_str(i[0])
                for i in self.etcd.get_prefix(self.node_prefix)
            ]
432
        self.hosts = list(set(self.hosts)) if self.hosts else self.hosts
433

434 435 436 437 438
        if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
            if len(self.hosts) == self.np:
                return True
            else:
                return False
439

440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
        if self.elastic_level == ElasticLevel.ELASTIC:
            hosts_num = len(self.hosts)
            if hosts_num == self.np:
                return True

            if not self.elastic_startup_time:
                self.elastic_startup_time = time.time()
            if hosts_num == self.max_np:
                self.elastic_startup_time = None
                return True
            elif hosts_num >= self.min_np and hosts_num < self.max_np:
                interval_time = time.time() - self.elastic_startup_time
                if interval_time <= self.elastic_timeout:
                    logger.info(
                        f"wait for timeout, you can set value by PADDLE_ELASTIC_TIMEOUT, \
                        hosts_num={hosts_num}, min_np={self.min_np}, \
                        interval_time={interval_time}, elastic_timeout={self.elastic_timeout}"
                    )
                    return False
                return True
            else:
                self.elastic_startup_time = None
                return False
463

464 465 466
        return False

    def _update_endpoint(self, endpoints, hosts):
467 468 469 470
        self.etcd.put(
            self.endpoints_path,
            '{}|{}'.format(endpoints, hosts).encode('latin-1'),
        )
471

472
    def _update_fault_tolrance(self):
473
        rank = int(os.getenv('PADDLE_TRAINER_ID', -1))
474 475 476
        logger.debug(
            f"self.curr_host={self.curr_host}, self.dist_endpoints={self.dist_endpoints}"
        )
477 478 479
        if self.curr_host in self.dist_endpoints:
            os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints
            os.environ['PADDLE_TRAINERS'] = self.trainers
480 481 482 483 484
            logger.info(
                "update env DISTRIBUTED_TRAINER_ENDPOINTS {} ".format(
                    self.dist_endpoints
                )
            )
485 486 487
            logger.info("update env PADDLE_TRAINERS {} ".format(self.trainers))
            return

488
        # fault tolerance
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
        idx = self.hosts.index(self.curr_host)

        # swap if self.host not in the right position
        if rank >= 0:
            self.hosts[idx] = self.hosts[rank]
            self.hosts[rank] = self.curr_host
        else:
            os.environ['PADDLE_TRAINER_ID'] = '{}'.format(idx)
        hosts = ','.join([host_port.split(":")[0] for host_port in self.hosts])
        self.args.ips = hosts
        os.environ['PADDLE_TRAINERS'] = hosts

    def _update_elastic_scale_out(self):
        host_endpoints = copy.deepcopy(self.trainer_endpoints_list)
        logger.info(
            f"elastic scale out, from {len(self.hosts)} to {self.np}, hosts={self.hosts}, host_endpoints={host_endpoints}"
        )
506

507 508 509 510 511
        for curr_host_port in self.hosts:
            if curr_host_port not in host_endpoints:
                host_endpoints.append(curr_host_port)

        os.environ['PADDLE_TRAINER_ID'] = '{}'.format(
512 513
            host_endpoints.index(self.curr_host)
        )
514
        hosts = ','.join(
515 516
            [host_port.split(":")[0] for host_port in host_endpoints]
        )
517 518 519 520 521 522 523 524 525 526 527 528
        self.args.ips = hosts
        os.environ['PADDLE_TRAINERS'] = hosts
        self.np = len(host_endpoints)
        os.environ['PADDLE_TRAINER_ENDPOINTS'] = ','.join(host_endpoints)
        os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints
        self.trainer_endpoints_list = host_endpoints

    def _update_elastic_scale_in(self):
        host_endpoints = copy.deepcopy(self.trainer_endpoints_list)
        logger.info(
            f"elastic scale in, from {self.np} to {len(self.hosts)}, hosts={self.hosts}, host_endpoints={host_endpoints}"
        )
529

530
        # If scale in node from the first of the rank list, you need to minimize the movement of the rank
531
        # eg:
532 533 534 535 536 537 538 539 540 541
        #   the source trainers is:10.10.10.0,10.10.10.1,10.10.10.2,10.10.10.3
        #   10.10.10.0 is removed
        #   the new trainers is:10.10.10.3,10.10.10.1,10.10.10.2
        #   In this case, the rank of 10.10.10.1 and 10.10.10.2 remains unchanged, while the rank of 10.10.10.3 is set to rank0
        endpoints_dict = dict()
        unsorted_endpoints = []
        for id, host_port in enumerate(self.hosts):
            idx = host_endpoints.index(host_port)
            if idx <= len(self.hosts) - 1 and not endpoints_dict.get(idx):
                endpoints_dict[idx] = host_port
542
            else:
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
                unsorted_endpoints.append(host_port)

        idle_index = 0
        sorted_endpoints = []
        for idx in range(len(self.hosts)):
            if not endpoints_dict.get(idx) and len(unsorted_endpoints) > 0:
                endpoints_dict[idx] = unsorted_endpoints[idle_index]
                idle_index += 1

            sorted_endpoints.append(endpoints_dict.get(idx))

        logger.info(f"elastic scale in, sorted_endpoints={sorted_endpoints}")
        self.trainer_endpoints_list = sorted_endpoints

        ip_list = [ip_port.split(":")[0] for ip_port in sorted_endpoints]
        hosts = ','.join(ip_list)
559 560 561
        new_endpoints = self._host_to_endpoints(
            sorted_endpoints, self.devices_per_proc
        )
562 563 564

        self.args.ips = hosts
        os.environ['PADDLE_TRAINER_ID'] = '{}'.format(
565 566
            sorted_endpoints.index(self.curr_host)
        )
567 568 569 570 571 572 573 574 575 576
        os.environ['PADDLE_TRAINERS'] = hosts
        self.np = len(sorted_endpoints)
        os.environ['PADDLE_TRAINER_ENDPOINTS'] = ','.join(sorted_endpoints)
        os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = new_endpoints
        self._update_endpoint(new_endpoints, hosts)

    def _update_hosts(self):
        assert len(self.hosts) != 0, 'hosts empty'
        if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
            self._update_fault_tolrance()
577
        else:
578 579 580 581 582 583 584 585
            # elastic
            if len(self.hosts) == self.np:
                logger.info(f"elastic startup, hosts={self.hosts}")
                self._update_fault_tolrance()

            elif len(self.hosts) > self.np:
                # scale out
                self._update_elastic_scale_out()
586
            else:
587 588
                # scale in
                self._update_elastic_scale_in()
589 590 591 592 593

    def wait(self):
        if not self.enable:
            return

K
kuizhiqing 已提交
594
        idx = 1
595 596 597 598 599
        while not self.stopped:
            if self._match():
                logger.info('ready with hosts {}'.format(self.hosts))
                self._update_hosts()
                return
600 601 602
            logger.info(
                'not ready for np {} with hosts {}'.format(self.np, self.hosts)
            )
K
kuizhiqing 已提交
603
            idx += 1
K
kuizhiqing 已提交
604
            time.sleep(2)
605 606 607 608 609 610 611 612 613 614 615
        return

    def run(self, launcher):
        if self.stopped:
            return

        self.launcher = launcher(self.args)
        self.launcher.launch()

    def watch(self):

K
kuizhiqing 已提交
616 617 618
        if self.need_sync:
            self.need_sync = False

619 620
        while not self.stopped:
            ret = self.launcher.watch()
621
            logger.debug(f"launcher.watch():{ret}")
622 623 624

            if ret is not None:  # self terminated
                logger.info('job exit with code {}'.format(ret))
625 626 627 628 629
                if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE:
                    logger.info('job re-launch for auto parallel')
                    self.launcher.stop()
                    return ElasticStatus.HOLD

630 631 632 633 634
                # process is completed if ret >= 0 or error else
                completed = True if ret == 0 else False
                self.exit(completed=completed)
                if completed:
                    return ElasticStatus.COMPLETED
635
                if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
636 637 638 639
                    return ElasticStatus.RESTART
                else:
                    return ElasticStatus.ERROR

K
kuizhiqing 已提交
640
            if not self._completed() and (not self._match() or self.need_sync):
641 642 643
                self.launcher.stop()
                return ElasticStatus.HOLD

K
kuizhiqing 已提交
644
            time.sleep(2)
645

K
kuizhiqing 已提交
646 647
        if self.launcher:
            self.launcher.stop()
648

649 650 651 652 653 654 655
        return ElasticStatus.EXIT

    def signal_handler(self, sigint, frame):
        if self.enable:
            self.exit()
        self.sigint = sigint
        self.stopped = True