worker.py 15.9 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import cloudpickle
import multiprocessing
import os
F
fuyw 已提交
18
import psutil
B
Bo Zhou 已提交
19
import signal
F
fuyw 已提交
20
import socket
F
fuyw 已提交
21 22
import subprocess
import sys
23
import tempfile
F
fuyw 已提交
24 25
import time
import threading
B
Bo Zhou 已提交
26
import warnings
F
fuyw 已提交
27
import zmq
F
fuyw 已提交
28
from datetime import datetime
F
fuyw 已提交
29

B
Bo Zhou 已提交
30
from parl.utils import get_ip_address, to_byte, to_str, logger, _IS_WINDOWS, kill_process
F
fuyw 已提交
31
from parl.remote import remote_constants
B
Bo Zhou 已提交
32 33 34
from parl.remote.message import InitializedWorker
from parl.remote.status import WorkerStatus
from six.moves import queue
F
fuyw 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58


class Worker(object):
    """Worker provides the cpu computation resources for the cluster.

    A worker node is connected to the master node and will send its
    computation resources information to the master node. When a worker
    node is created, it will start `cpu_num` empty jobs and these jobs'
    ip addresses will be send to the master node. Further, when an old
    job is killed, worker will start a new job and send the new job ip
    address to the master node.

    To start a worker, we use the following xparl command line api:

    .. code-block:: python

        xparl connect --address localhost:1234 --cpu_num 8

    Attributes:
        master_address (str): Master's ip address.
        request_master_socket (zmq.Context.socket): A socket which sends job
                                                    address to the master node.
        reply_job_socket (zmq.Context.socket): A socket which receives
                                               job_address from the job.
B
Bo Zhou 已提交
59
        kill_job_socket (zmq.Context.socket): A socket that receives commands to kill the job from jobs.
60 61
        job_buffer (str): A buffer that stores initialized jobs for providing new jobs in a short time.

F
fuyw 已提交
62 63 64 65 66
    Args:
        master_address (str): IP address of the master node.
        cpu_num (int): Number of cpu to be used on the worker.
    """

67
    def __init__(self, master_address, cpu_num=None, log_server_port=None):
F
fuyw 已提交
68 69 70 71 72 73
        self.lock = threading.Lock()
        self.heartbeat_socket_initialized = threading.Event()
        self.ctx = zmq.Context.instance()
        self.master_address = master_address
        self.master_is_alive = True
        self.worker_is_alive = True
B
Bo Zhou 已提交
74
        self.worker_status = None  # initialized at `self._create_jobs`
F
fuyw 已提交
75
        self._set_cpu_num(cpu_num)
B
Bo Zhou 已提交
76
        self.job_buffer = queue.Queue(maxsize=self.cpu_num)
F
fuyw 已提交
77
        self._create_sockets()
78 79 80
        # create log server
        self.log_server_proc, self.log_server_address = self._create_log_server(
            port=log_server_port)
B
Bo Zhou 已提交
81 82 83

        # create a thread that waits commands from the job to kill the job.
        self.kill_job_thread = threading.Thread(target=self._reply_kill_job)
84
        self.kill_job_thread.setDaemon(True)
B
Bo Zhou 已提交
85 86 87 88 89 90 91 92
        self.kill_job_thread.start()

        self._create_jobs()

        # create a thread that initializes jobs and adds them into the job_buffer
        job_thread = threading.Thread(target=self._fill_job_buffer)
        job_thread.setDaemon(True)
        job_thread.start()
F
fuyw 已提交
93 94 95 96 97 98 99 100 101 102 103 104

    def _set_cpu_num(self, cpu_num=None):
        """set useable cpu number for worker"""
        if cpu_num is not None:
            assert isinstance(
                cpu_num, int
            ), "cpu_num should be INT type, please check the input type."
            self.cpu_num = cpu_num
        else:
            self.cpu_num = multiprocessing.cpu_count()

    def _create_sockets(self):
F
fuyw 已提交
105
        """ Each worker has three sockets at start:
F
fuyw 已提交
106 107

        (1) request_master_socket: sends job address to master node.
F
fuyw 已提交
108 109
        (2) reply_job_socket: receives job_address from subprocess.
        (3) kill_job_socket : receives commands to kill the job from jobs.
F
fuyw 已提交
110

B
Bo Zhou 已提交
111 112
        When a job starts, a new heartbeat socket is created to receive
        heartbeat signals from the job.
F
fuyw 已提交
113 114

        """
F
fuyw 已提交
115
        self.worker_ip = get_ip_address()
F
fuyw 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130

        # request_master_socket: sends job address to master
        self.request_master_socket = self.ctx.socket(zmq.REQ)
        self.request_master_socket.linger = 0

        # wait for 0.5 second to check whether master is started
        self.request_master_socket.setsockopt(zmq.RCVTIMEO, 500)
        self.request_master_socket.connect("tcp://" + self.master_address)

        # reply_job_socket: receives job_address from subprocess
        self.reply_job_socket = self.ctx.socket(zmq.REP)
        self.reply_job_socket.linger = 0
        reply_job_port = self.reply_job_socket.bind_to_random_port("tcp://*")
        self.reply_job_address = "{}:{}".format(self.worker_ip, reply_job_port)

B
Bo Zhou 已提交
131 132 133 134 135 136 137 138
        # kill_job_socket
        self.kill_job_socket = self.ctx.socket(zmq.REP)
        self.kill_job_socket.linger = 0
        kill_job_port = self.kill_job_socket.bind_to_random_port("tcp://*")
        self.kill_job_address = "{}:{}".format(self.worker_ip, kill_job_port)

    def _create_jobs(self):
        """Create jobs and send a instance of ``InitializedWorker`` that contains the worker information to the master."""
F
fuyw 已提交
139 140 141 142 143 144 145 146 147 148
        try:
            self.request_master_socket.send_multipart(
                [remote_constants.WORKER_CONNECT_TAG])
            _ = self.request_master_socket.recv_multipart()
        except zmq.error.Again as e:
            logger.error("Can not connect to the master, "
                         "please check if master is started.")
            self.master_is_alive = False
            return

B
Bo Zhou 已提交
149
        initialized_jobs = self._init_jobs(job_num=self.cpu_num)
F
fuyw 已提交
150 151 152
        self.request_master_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)

B
Bo Zhou 已提交
153
        self.reply_master_hearbeat_thread = threading.Thread(
F
fuyw 已提交
154
            target=self._reply_heartbeat,
B
Bo Zhou 已提交
155
            args=("master {}".format(self.master_address), ))
B
Bo Zhou 已提交
156
        self.reply_master_hearbeat_thread.start()
F
fuyw 已提交
157 158
        self.heartbeat_socket_initialized.wait()

F
fuyw 已提交
159 160 161 162 163 164
        for job in initialized_jobs:
            job.worker_address = self.master_heartbeat_address

        initialized_worker = InitializedWorker(self.master_heartbeat_address,
                                               initialized_jobs, self.cpu_num,
                                               socket.gethostname())
F
fuyw 已提交
165 166
        self.request_master_socket.send_multipart([
            remote_constants.WORKER_INITIALIZED_TAG,
B
Bo Zhou 已提交
167
            cloudpickle.dumps(initialized_worker)
F
fuyw 已提交
168
        ])
F
fuyw 已提交
169

F
fuyw 已提交
170
        _ = self.request_master_socket.recv_multipart()
F
fuyw 已提交
171
        self.worker_status = WorkerStatus(self.master_heartbeat_address,
B
Bo Zhou 已提交
172 173 174 175
                                          initialized_jobs, self.cpu_num)

    def _fill_job_buffer(self):
        """An endless loop that adds initialized job into the job buffer"""
176
        initialized_jobs = []
B
Bo Zhou 已提交
177
        while self.worker_is_alive:
178
            if self.job_buffer.full() is False:
179 180 181 182 183 184 185
                job_num = self.cpu_num - self.job_buffer.qsize()
                if job_num > 0:
                    initialized_jobs = self._init_jobs(job_num=job_num)
                    for job in initialized_jobs:
                        self.job_buffer.put(job)

            time.sleep(0.02)
186
        self.exit()
F
fuyw 已提交
187

B
Bo Zhou 已提交
188 189 190 191 192 193 194 195
    def _init_jobs(self, job_num):
        """Create jobs.

        Args:
            job_num(int): the number of jobs to create.
        """
        job_file = __file__.replace('worker.pyc', 'job.py')
        job_file = job_file.replace('worker.py', 'job.py')
F
fuyw 已提交
196
        command = [
197
            sys.executable, job_file, "--worker_address",
198 199
            self.reply_job_address, "--log_server_address",
            self.log_server_address
F
fuyw 已提交
200 201
        ]

F
fuyw 已提交
202 203 204
        if sys.version_info.major == 3:
            warnings.simplefilter("ignore", ResourceWarning)

B
Bo Zhou 已提交
205 206 207
        # avoid that many jobs are killed and restarted at the same time.
        self.lock.acquire()

B
Bo Zhou 已提交
208 209 210
        # Redirect the output to DEVNULL
        FNULL = open(os.devnull, 'w')
        for _ in range(job_num):
B
Bo Zhou 已提交
211
            subprocess.Popen(command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
212 213
        FNULL.close()

B
Bo Zhou 已提交
214
        new_jobs = []
B
Bo Zhou 已提交
215 216
        for _ in range(job_num):
            job_message = self.reply_job_socket.recv_multipart()
B
Bo Zhou 已提交
217 218 219 220 221
            self.reply_job_socket.send_multipart(
                [remote_constants.NORMAL_TAG,
                 to_byte(self.kill_job_address)])
            initialized_job = cloudpickle.loads(job_message[1])
            new_jobs.append(initialized_job)
B
Bo Zhou 已提交
222 223 224

            # a thread for sending heartbeat signals to job
            thread = threading.Thread(
B
Bo Zhou 已提交
225
                target=self._create_job_monitor, args=(initialized_job, ))
226
            thread.setDaemon(True)
B
Bo Zhou 已提交
227
            thread.start()
B
Bo Zhou 已提交
228 229 230
        self.lock.release()
        assert len(new_jobs) > 0, "init jobs failed"
        return new_jobs
F
fuyw 已提交
231 232

    def _kill_job(self, job_address):
B
Bo Zhou 已提交
233 234 235 236 237
        """Kill a job process and update worker information"""
        success = self.worker_status.remove_job(job_address)
        if success:
            while True:
                initialized_job = self.job_buffer.get()
F
fuyw 已提交
238
                initialized_job.worker_address = self.master_heartbeat_address
B
Bo Zhou 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251
                if initialized_job.is_alive:
                    self.worker_status.add_job(initialized_job)
                    if not initialized_job.is_alive:  # make sure that the job is still alive.
                        self.worker_status.remove_job(
                            initialized_job.job_address)
                        continue
                else:
                    logger.warning(
                        "[Worker] a dead job found. The job buffer will not accept this one."
                    )
                if initialized_job.is_alive:
                    break

B
Bo Zhou 已提交
252
            self.lock.acquire()
B
Bo Zhou 已提交
253 254 255 256 257 258
            self.request_master_socket.send_multipart([
                remote_constants.NEW_JOB_TAG,
                cloudpickle.dumps(initialized_job),
                to_byte(job_address)
            ])
            _ = self.request_master_socket.recv_multipart()
B
Bo Zhou 已提交
259
            self.lock.release()
F
fuyw 已提交
260

B
Bo Zhou 已提交
261 262
    def _create_job_monitor(self, job):
        """Send heartbeat signals to check target's status"""
F
fuyw 已提交
263 264 265 266 267 268

        # job_heartbeat_socket: sends heartbeat signal to job
        job_heartbeat_socket = self.ctx.socket(zmq.REQ)
        job_heartbeat_socket.linger = 0
        job_heartbeat_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
B
Bo Zhou 已提交
269
        job_heartbeat_socket.connect("tcp://" + job.worker_heartbeat_address)
F
fuyw 已提交
270

B
Bo Zhou 已提交
271 272
        job.is_alive = True
        while job.is_alive and self.master_is_alive and self.worker_is_alive:
F
fuyw 已提交
273 274 275 276 277 278
            try:
                job_heartbeat_socket.send_multipart(
                    [remote_constants.HEARTBEAT_TAG])
                _ = job_heartbeat_socket.recv_multipart()
                time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
            except zmq.error.Again as e:
B
Bo Zhou 已提交
279 280 281 282 283 284
                job.is_alive = False
                logger.warning(
                    "[Worker] lost connection with the job:{}".format(
                        job.job_address))
                if self.master_is_alive and self.worker_is_alive:
                    self._kill_job(job.job_address)
F
fuyw 已提交
285 286 287 288 289 290

            except zmq.error.ZMQError as e:
                break

        job_heartbeat_socket.close(0)

B
Bo Zhou 已提交
291 292 293 294 295 296 297 298
    def _reply_kill_job(self):
        """Worker starts a thread to wait jobs' commands to kill the job"""
        self.kill_job_socket.linger = 0
        self.kill_job_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
        while self.worker_is_alive and self.master_is_alive:
            try:
                message = self.kill_job_socket.recv_multipart()
299 300
                tag = message[0]
                assert tag == remote_constants.KILLJOB_TAG
B
Bo Zhou 已提交
301 302 303 304 305 306 307 308
                to_kill_job_address = to_str(message[1])
                self._kill_job(to_kill_job_address)
                self.kill_job_socket.send_multipart(
                    [remote_constants.NORMAL_TAG])
            except zmq.error.Again as e:
                #detect whether `self.worker_is_alive` is True periodically
                pass

F
fuyw 已提交
309 310 311 312 313 314
    def _get_worker_status(self):
        now = datetime.strftime(datetime.now(), '%H:%M:%S')
        virtual_memory = psutil.virtual_memory()
        total_memory = round(virtual_memory[0] / (1024**3), 2)
        used_memory = round(virtual_memory[3] / (1024**3), 2)
        vacant_memory = round(total_memory - used_memory, 2)
H
Hongsheng Zeng 已提交
315 316 317 318
        if _IS_WINDOWS:
            load_average = round(psutil.getloadavg()[0], 2)
        else:
            load_average = round(os.getloadavg()[0], 2)
F
fuyw 已提交
319 320
        return (vacant_memory, used_memory, now, load_average)

F
fuyw 已提交
321 322 323 324 325 326 327 328 329 330
    def _reply_heartbeat(self, target):
        """Worker will kill its jobs when it lost connection with the master.
        """

        socket = self.ctx.socket(zmq.REP)
        socket.linger = 0
        socket.setsockopt(zmq.RCVTIMEO,
                          remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
        heartbeat_master_port =\
            socket.bind_to_random_port("tcp://*")
B
Bo Zhou 已提交
331
        self.master_heartbeat_address = "{}:{}".format(self.worker_ip,
F
fuyw 已提交
332
                                                       heartbeat_master_port)
F
fuyw 已提交
333 334 335

        logger.set_dir(
            os.path.expanduser('~/.parl_data/worker/{}'.format(
H
Hongsheng Zeng 已提交
336
                self.master_heartbeat_address.replace(':', '_'))))
F
fuyw 已提交
337

F
fuyw 已提交
338 339 340
        self.heartbeat_socket_initialized.set()
        logger.info("[Worker] Connect to the master node successfully. "
                    "({} CPUs)".format(self.cpu_num))
341
        while self.master_is_alive and self.worker_is_alive:
F
fuyw 已提交
342 343
            try:
                message = socket.recv_multipart()
F
fuyw 已提交
344 345 346 347 348 349 350 351
                worker_status = self._get_worker_status()
                socket.send_multipart([
                    remote_constants.HEARTBEAT_TAG,
                    to_byte(str(worker_status[0])),
                    to_byte(str(worker_status[1])),
                    to_byte(worker_status[2]),
                    to_byte(str(worker_status[3]))
                ])
F
fuyw 已提交
352 353 354 355 356
            except zmq.error.Again as e:
                self.master_is_alive = False
            except zmq.error.ContextTerminated as e:
                break
        socket.close(0)
357
        logger.warning(
358
            "[Worker] lost connection with the master, will exit reply heartbeat for master."
359
        )
B
Bo Zhou 已提交
360
        self.worker_status.clear()
361 362
        self.log_server_proc.kill()
        self.log_server_proc.wait()
363 364
        # exit the worker
        self.worker_is_alive = False
365
        self.exit()
F
fuyw 已提交
366

367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
    def _create_log_server(self, port):
        log_server_file = __file__.replace('worker.pyc', 'log_server.py')
        log_server_file = log_server_file.replace('worker.py', 'log_server.py')

        if port is None:
            port = "0"  # `0` means using a random port in flask
        command = [
            sys.executable, log_server_file, "--port",
            str(port), "--log_dir", "~/.parl_data/job/", "--line_num", "500"
        ]

        if sys.version_info.major == 3:
            warnings.simplefilter("ignore", ResourceWarning)

        if _IS_WINDOWS:
            FNULL = tempfile.TemporaryFile()
        else:
            FNULL = open(os.devnull, 'w')
        log_server_proc = subprocess.Popen(
            command,
            stdout=FNULL,
            stderr=subprocess.STDOUT,
        )
        FNULL.close()

        log_server_address = "{}:{}".format(self.worker_ip, port)
        return log_server_proc, log_server_address

F
fuyw 已提交
395
    def exit(self):
396
        """close the worker"""
F
fuyw 已提交
397
        self.worker_is_alive = False
B
Bo Zhou 已提交
398
        kill_process('remote/job.py.*{}'.format(self.reply_job_address))
F
fuyw 已提交
399 400

    def run(self):
B
Bo Zhou 已提交
401
        """Keep running until it lost connection with the master.
F
fuyw 已提交
402
        """
B
Bo Zhou 已提交
403
        self.reply_master_hearbeat_thread.join()