master.py 12.3 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pickle
import threading
import time
import zmq
F
fuyw 已提交
20
from collections import deque, defaultdict
21 22
import parl
import sys
F
fuyw 已提交
23
from parl.utils import to_str, to_byte, logger, get_ip_address
F
fuyw 已提交
24
from parl.remote import remote_constants
B
Bo Zhou 已提交
25
from parl.remote.job_center import JobCenter
F
fuyw 已提交
26
from parl.remote.cluster_monitor import ClusterMonitor
B
Bo Zhou 已提交
27 28
import cloudpickle
import time
F
fuyw 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47


class Master(object):
    """Base class for a master node, the control center for our cluster, which provides connections to workers and clients.

    There is only one master node in each cluster, and it is responsible for
    receiving jobs from the clients and allocating computation resources to
    run the jobs.

    To start a master node, we use the following xparl command line api:

    .. code-block:: python

        xparl start --port localhost:1234

    At the same time, a local worker will be started and connect to the
    master node.

    Attributes:
B
Bo Zhou 已提交
48
        job_center (JobCenter): A thread-safe data structure that stores the job address of vacant cpus.
49
        client_socket (zmq.Context.socket): A socket that receives submitted
F
fuyw 已提交
50 51
                                           job from the client, and later sends
                                           job_address back to the client.
F
fuyw 已提交
52
        master_ip(str): The ip address of the master node.
B
Bo Zhou 已提交
53 54
        cpu_num(int): The number of available CPUs in the cluster.
        worker_num(int): The number of workers connected to this cluster.
F
fuyw 已提交
55 56
        cluster_monitor(dict): A dict to record worker status and client status.
        client_hostname(dict): A dict to store hostname for each client address.
F
fuyw 已提交
57 58

    Args:
B
Bo Zhou 已提交
59
        port: The ip port that the master node binds to.
F
fuyw 已提交
60 61
    """

62
    def __init__(self, port, monitor_port=None):
F
fuyw 已提交
63
        self.ctx = zmq.Context()
F
fuyw 已提交
64
        self.master_ip = get_ip_address()
65
        self.monitor_url = "http://{}:{}".format(self.master_ip, monitor_port)
66
        logger.set_dir(
H
Hongsheng Zeng 已提交
67
            os.path.expanduser('~/.parl_data/master/{}_{}'.format(
68
                self.master_ip, port)))
F
fuyw 已提交
69 70 71 72 73
        self.client_socket = self.ctx.socket(zmq.REP)
        self.client_socket.bind("tcp://*:{}".format(port))
        self.client_socket.linger = 0
        self.port = port

F
fuyw 已提交
74 75
        self.job_center = JobCenter(self.master_ip)
        self.cluster_monitor = ClusterMonitor()
F
fuyw 已提交
76
        self.master_is_alive = True
F
fuyw 已提交
77 78 79 80
        self.client_hostname = defaultdict(int)

    def _get_status(self):
        return self.cluster_monitor.get_status()
F
fuyw 已提交
81

F
fuyw 已提交
82
    def _create_worker_monitor(self, worker_address):
F
fuyw 已提交
83 84 85 86 87 88 89
        """When a new worker connects to the master, a socket is created to
        send heartbeat signals to the worker.
        """
        worker_heartbeat_socket = self.ctx.socket(zmq.REQ)
        worker_heartbeat_socket.linger = 0
        worker_heartbeat_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
F
fuyw 已提交
90
        worker_heartbeat_socket.connect("tcp://" + worker_address)
F
fuyw 已提交
91 92 93 94 95 96

        connected = True
        while connected and self.master_is_alive:
            try:
                worker_heartbeat_socket.send_multipart(
                    [remote_constants.HEARTBEAT_TAG])
F
fuyw 已提交
97 98 99 100 101
                worker_status = worker_heartbeat_socket.recv_multipart()
                vacant_cpus = self.job_center.get_vacant_cpu(worker_address)
                total_cpus = self.job_center.get_total_cpu(worker_address)
                self.cluster_monitor.update_worker_status(
                    worker_status, worker_address, vacant_cpus, total_cpus)
F
fuyw 已提交
102 103
                time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
            except zmq.error.Again as e:
B
Bo Zhou 已提交
104
                self.job_center.drop_worker(worker_address)
F
fuyw 已提交
105
                self.cluster_monitor.drop_worker_status(worker_address)
F
fuyw 已提交
106 107 108 109 110 111 112 113 114 115 116 117
                logger.warning("\n[Master] Cannot connect to the worker " +
                               "{}. ".format(worker_address) +
                               "Worker_pool will drop this worker.")
                self._print_workers()
                connected = False
            except zmq.error.ZMQError as e:
                break

        worker_heartbeat_socket.close(0)
        logger.warning("Exit worker monitor from master.")

    def _create_client_monitor(self, client_heartbeat_address):
B
Bo Zhou 已提交
118
        """When a new client connects to the master, a socket is created to
F
fuyw 已提交
119 120 121 122 123 124 125 126 127
        send heartbeat signals to the client.
        """

        client_heartbeat_socket = self.ctx.socket(zmq.REQ)
        client_heartbeat_socket.linger = 0
        client_heartbeat_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
        client_heartbeat_socket.connect("tcp://" + client_heartbeat_address)

B
Bo Zhou 已提交
128 129
        client_is_alive = True
        while client_is_alive and self.master_is_alive:
F
fuyw 已提交
130 131 132
            try:
                client_heartbeat_socket.send_multipart(
                    [remote_constants.HEARTBEAT_TAG])
F
fuyw 已提交
133 134 135 136 137 138
                client_status = client_heartbeat_socket.recv_multipart()

                self.cluster_monitor.update_client_status(
                    client_status, client_heartbeat_address,
                    self.client_hostname[client_heartbeat_address])

F
fuyw 已提交
139
            except zmq.error.Again as e:
B
Bo Zhou 已提交
140
                client_is_alive = False
141
                self.cluster_monitor.drop_client_status(
F
fuyw 已提交
142
                    client_heartbeat_address)
F
fuyw 已提交
143 144 145 146 147 148 149 150
                logger.warning("[Master] cannot connect to the client " +
                               "{}. ".format(client_heartbeat_address) +
                               "Please check if it is still alive.")
            time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
        logger.warning("Master exits client monitor for {}.\n".format(
            client_heartbeat_address))
        logger.info(
            "Master connects to {} workers and have {} vacant CPUs.\n".format(
B
Bo Zhou 已提交
151
                self.worker_num, self.cpu_num))
F
fuyw 已提交
152 153 154 155 156 157
        client_heartbeat_socket.close(0)

    def _print_workers(self):
        """Display `worker_pool` infomation."""
        logger.info(
            "Master connects to {} workers and have {} vacant CPUs.\n".format(
B
Bo Zhou 已提交
158
                self.worker_num, self.cpu_num))
F
fuyw 已提交
159

160 161
    @property
    def cpu_num(self):
B
Bo Zhou 已提交
162 163 164 165 166
        return self.job_center.cpu_num

    @property
    def worker_num(self):
        return self.job_center.worker_num
167

F
fuyw 已提交
168
    def _receive_message(self):
B
Bo Zhou 已提交
169
        """Master node will receive various types of message: (1) worker
F
fuyw 已提交
170
        connection; (2) worker update; (3) client connection; (4) job
B
Bo Zhou 已提交
171
        submittion; (5) reset job.
F
fuyw 已提交
172 173 174 175 176 177 178 179
        """
        message = self.client_socket.recv_multipart()
        tag = message[0]

        # a new worker connects to the master
        if tag == remote_constants.WORKER_CONNECT_TAG:
            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

F
fuyw 已提交
180 181 182 183 184
        elif tag == remote_constants.MONITOR_TAG:
            status = self._get_status()
            self.client_socket.send_multipart(
                [remote_constants.NORMAL_TAG, status])

185 186 187 188 189 190
        # `xparl status` command line API
        elif tag == remote_constants.STATUS_TAG:
            status_info = self.cluster_monitor.get_status_info()
            self.client_socket.send_multipart(
                [remote_constants.NORMAL_TAG,
                 to_byte(status_info)])
F
fuyw 已提交
191

192
        elif tag == remote_constants.WORKER_INITIALIZED_TAG:
B
Bo Zhou 已提交
193
            initialized_worker = cloudpickle.loads(message[1])
F
fuyw 已提交
194
            worker_address = initialized_worker.worker_address
B
Bo Zhou 已提交
195
            self.job_center.add_worker(initialized_worker)
F
fuyw 已提交
196 197 198
            hostname = self.job_center.get_hostname(worker_address)
            self.cluster_monitor.add_worker_status(worker_address, hostname)
            logger.info("A new worker {} is added, ".format(worker_address) +
B
Bo Zhou 已提交
199
                        "the cluster has {} CPUs.\n".format(self.cpu_num))
F
fuyw 已提交
200 201 202 203

            # a thread for sending heartbeat signals to `worker.address`
            thread = threading.Thread(
                target=self._create_worker_monitor,
F
fuyw 已提交
204
                args=(initialized_worker.worker_address, ))
F
fuyw 已提交
205 206 207 208 209 210
            thread.start()

            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

        # a client connects to the master
        elif tag == remote_constants.CLIENT_CONNECT_TAG:
211 212
            # `client_heartbeat_address` is the
            #      `reply_master_heartbeat_address` of the client
213

F
fuyw 已提交
214
            client_heartbeat_address = to_str(message[1])
F
fuyw 已提交
215
            client_hostname = to_str(message[2])
216
            client_id = to_str(message[3])
F
fuyw 已提交
217
            self.client_hostname[client_heartbeat_address] = client_hostname
F
fuyw 已提交
218 219 220 221 222
            logger.info(
                "Client {} is connected.".format(client_heartbeat_address))

            thread = threading.Thread(
                target=self._create_client_monitor,
B
Bo Zhou 已提交
223
                args=(client_heartbeat_address, ))
F
fuyw 已提交
224
            thread.start()
225 226 227 228 229
            log_monitor_address = "{}/logs?client_id={}".format(
                self.monitor_url, client_id)
            self.client_socket.send_multipart(
                [remote_constants.NORMAL_TAG,
                 to_byte(log_monitor_address)])
F
fuyw 已提交
230

231 232 233 234 235 236 237
        elif tag == remote_constants.CHECK_VERSION_TAG:
            self.client_socket.send_multipart([
                remote_constants.NORMAL_TAG,
                to_byte(parl.__version__),
                to_byte(str(sys.version_info.major))
            ])

F
fuyw 已提交
238 239 240
        # a client submits a job to the master
        elif tag == remote_constants.CLIENT_SUBMIT_TAG:
            # check available CPU resources
B
Bo Zhou 已提交
241
            if self.cpu_num:
F
fuyw 已提交
242
                logger.info("Submitting job...")
B
Bo Zhou 已提交
243 244 245 246 247
                job = self.job_center.request_job()
                self.client_socket.send_multipart([
                    remote_constants.NORMAL_TAG,
                    to_byte(job.job_address),
                    to_byte(job.client_heartbeat_address),
F
fuyw 已提交
248
                    to_byte(job.ping_heartbeat_address),
B
Bo Zhou 已提交
249
                ])
250 251 252
                client_id = to_str(message[2])
                job_info = {job.job_id: job.log_server_address}
                self.cluster_monitor.add_client_job(client_id, job_info)
F
fuyw 已提交
253 254 255 256 257 258
                self._print_workers()
            else:
                self.client_socket.send_multipart([remote_constants.CPU_TAG])

        # a worker updates
        elif tag == remote_constants.NEW_JOB_TAG:
B
Bo Zhou 已提交
259 260
            initialized_job = cloudpickle.loads(message[1])
            last_job_address = to_str(message[2])
F
fuyw 已提交
261 262

            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
B
Bo Zhou 已提交
263 264 265
            self.job_center.update_job(last_job_address, initialized_job,
                                       initialized_job.worker_address)
            logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))
F
fuyw 已提交
266 267 268 269 270 271 272 273 274 275 276

            self._print_workers()

        # check before start a worker
        elif tag == remote_constants.NORMAL_TAG:
            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

        else:
            raise NotImplementedError()

    def exit(self):
B
Bo Zhou 已提交
277 278
        """ Close the master.
        """
F
fuyw 已提交
279 280 281 282 283 284 285 286 287 288 289 290 291 292
        self.master_is_alive = False

    def run(self):
        """An infinite loop waiting for messages from the workers and
        clients.

        Master node will receive four types of messages:

        1. A new worker connects to the master node.
        2. A connected worker sending new job address after it kills an old
           job.
        3. A new client connects to the master node.
        4. A connected client submits a job after a remote object is created.
        """
293 294 295 296
        self.client_socket.linger = 0
        self.client_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)

F
fuyw 已提交
297 298 299 300
        while self.master_is_alive:
            try:
                self._receive_message()
                pass
301 302 303
            except zmq.error.Again as e:
                #detect whether `self.master_is_alive` is True periodically
                pass
F
fuyw 已提交
304 305

        logger.warning("[Master] Exit master.")