master.py 12.1 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pickle
import threading
import time
import zmq
F
fuyw 已提交
20 21
from collections import deque, defaultdict
from parl.utils import to_str, to_byte, logger, get_ip_address
F
fuyw 已提交
22
from parl.remote import remote_constants
B
Bo Zhou 已提交
23
from parl.remote.job_center import JobCenter
F
fuyw 已提交
24
from parl.remote.cluster_monitor import ClusterMonitor
B
Bo Zhou 已提交
25 26
import cloudpickle
import time
F
fuyw 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45


class Master(object):
    """Base class for a master node, the control center for our cluster, which provides connections to workers and clients.

    There is only one master node in each cluster, and it is responsible for
    receiving jobs from the clients and allocating computation resources to
    run the jobs.

    To start a master node, we use the following xparl command line api:

    .. code-block:: python

        xparl start --port localhost:1234

    At the same time, a local worker will be started and connect to the
    master node.

    Attributes:
B
Bo Zhou 已提交
46
        job_center (JobCenter): A thread-safe data structure that stores the job address of vacant cpus.
47
        client_socket (zmq.Context.socket): A socket that receives submitted
F
fuyw 已提交
48 49
                                           job from the client, and later sends
                                           job_address back to the client.
F
fuyw 已提交
50
        master_ip(str): The ip address of the master node.
B
Bo Zhou 已提交
51 52
        cpu_num(int): The number of available CPUs in the cluster.
        worker_num(int): The number of workers connected to this cluster.
F
fuyw 已提交
53 54
        cluster_monitor(dict): A dict to record worker status and client status.
        client_hostname(dict): A dict to store hostname for each client address.
F
fuyw 已提交
55 56

    Args:
B
Bo Zhou 已提交
57
        port: The ip port that the master node binds to.
F
fuyw 已提交
58 59
    """

60
    def __init__(self, port, monitor_port=None):
F
fuyw 已提交
61
        self.ctx = zmq.Context()
F
fuyw 已提交
62
        self.master_ip = get_ip_address()
63
        self.monitor_url = "http://{}:{}".format(self.master_ip, monitor_port)
64
        logger.set_dir(
H
Hongsheng Zeng 已提交
65
            os.path.expanduser('~/.parl_data/master/{}_{}'.format(
66
                self.master_ip, port)))
F
fuyw 已提交
67 68 69 70 71
        self.client_socket = self.ctx.socket(zmq.REP)
        self.client_socket.bind("tcp://*:{}".format(port))
        self.client_socket.linger = 0
        self.port = port

F
fuyw 已提交
72 73
        self.job_center = JobCenter(self.master_ip)
        self.cluster_monitor = ClusterMonitor()
F
fuyw 已提交
74
        self.master_is_alive = True
F
fuyw 已提交
75 76 77 78
        self.client_hostname = defaultdict(int)

    def _get_status(self):
        return self.cluster_monitor.get_status()
F
fuyw 已提交
79

F
fuyw 已提交
80
    def _create_worker_monitor(self, worker_address):
F
fuyw 已提交
81 82 83 84 85 86 87
        """When a new worker connects to the master, a socket is created to
        send heartbeat signals to the worker.
        """
        worker_heartbeat_socket = self.ctx.socket(zmq.REQ)
        worker_heartbeat_socket.linger = 0
        worker_heartbeat_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
F
fuyw 已提交
88
        worker_heartbeat_socket.connect("tcp://" + worker_address)
F
fuyw 已提交
89 90 91 92 93 94

        connected = True
        while connected and self.master_is_alive:
            try:
                worker_heartbeat_socket.send_multipart(
                    [remote_constants.HEARTBEAT_TAG])
F
fuyw 已提交
95 96 97 98 99
                worker_status = worker_heartbeat_socket.recv_multipart()
                vacant_cpus = self.job_center.get_vacant_cpu(worker_address)
                total_cpus = self.job_center.get_total_cpu(worker_address)
                self.cluster_monitor.update_worker_status(
                    worker_status, worker_address, vacant_cpus, total_cpus)
F
fuyw 已提交
100 101
                time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
            except zmq.error.Again as e:
B
Bo Zhou 已提交
102
                self.job_center.drop_worker(worker_address)
F
fuyw 已提交
103
                self.cluster_monitor.drop_worker_status(worker_address)
F
fuyw 已提交
104 105 106 107 108 109 110 111 112 113 114 115
                logger.warning("\n[Master] Cannot connect to the worker " +
                               "{}. ".format(worker_address) +
                               "Worker_pool will drop this worker.")
                self._print_workers()
                connected = False
            except zmq.error.ZMQError as e:
                break

        worker_heartbeat_socket.close(0)
        logger.warning("Exit worker monitor from master.")

    def _create_client_monitor(self, client_heartbeat_address):
B
Bo Zhou 已提交
116
        """When a new client connects to the master, a socket is created to
F
fuyw 已提交
117 118 119 120 121 122 123 124 125
        send heartbeat signals to the client.
        """

        client_heartbeat_socket = self.ctx.socket(zmq.REQ)
        client_heartbeat_socket.linger = 0
        client_heartbeat_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
        client_heartbeat_socket.connect("tcp://" + client_heartbeat_address)

B
Bo Zhou 已提交
126 127
        client_is_alive = True
        while client_is_alive and self.master_is_alive:
F
fuyw 已提交
128 129 130
            try:
                client_heartbeat_socket.send_multipart(
                    [remote_constants.HEARTBEAT_TAG])
F
fuyw 已提交
131 132 133 134 135 136
                client_status = client_heartbeat_socket.recv_multipart()

                self.cluster_monitor.update_client_status(
                    client_status, client_heartbeat_address,
                    self.client_hostname[client_heartbeat_address])

F
fuyw 已提交
137
            except zmq.error.Again as e:
B
Bo Zhou 已提交
138
                client_is_alive = False
139
                self.cluster_monitor.drop_client_status(
F
fuyw 已提交
140
                    client_heartbeat_address)
F
fuyw 已提交
141 142 143 144 145 146 147 148
                logger.warning("[Master] cannot connect to the client " +
                               "{}. ".format(client_heartbeat_address) +
                               "Please check if it is still alive.")
            time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
        logger.warning("Master exits client monitor for {}.\n".format(
            client_heartbeat_address))
        logger.info(
            "Master connects to {} workers and have {} vacant CPUs.\n".format(
B
Bo Zhou 已提交
149
                self.worker_num, self.cpu_num))
F
fuyw 已提交
150 151 152 153 154 155
        client_heartbeat_socket.close(0)

    def _print_workers(self):
        """Display `worker_pool` infomation."""
        logger.info(
            "Master connects to {} workers and have {} vacant CPUs.\n".format(
B
Bo Zhou 已提交
156
                self.worker_num, self.cpu_num))
F
fuyw 已提交
157

158 159
    @property
    def cpu_num(self):
B
Bo Zhou 已提交
160 161 162 163 164
        return self.job_center.cpu_num

    @property
    def worker_num(self):
        return self.job_center.worker_num
165

F
fuyw 已提交
166
    def _receive_message(self):
B
Bo Zhou 已提交
167
        """Master node will receive various types of message: (1) worker
F
fuyw 已提交
168
        connection; (2) worker update; (3) client connection; (4) job
B
Bo Zhou 已提交
169
        submittion; (5) reset job.
F
fuyw 已提交
170 171 172 173 174 175 176 177
        """
        message = self.client_socket.recv_multipart()
        tag = message[0]

        # a new worker connects to the master
        if tag == remote_constants.WORKER_CONNECT_TAG:
            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

F
fuyw 已提交
178 179 180 181 182
        elif tag == remote_constants.MONITOR_TAG:
            status = self._get_status()
            self.client_socket.send_multipart(
                [remote_constants.NORMAL_TAG, status])

183 184 185 186 187 188
        # `xparl status` command line API
        elif tag == remote_constants.STATUS_TAG:
            status_info = self.cluster_monitor.get_status_info()
            self.client_socket.send_multipart(
                [remote_constants.NORMAL_TAG,
                 to_byte(status_info)])
F
fuyw 已提交
189

190
        elif tag == remote_constants.WORKER_INITIALIZED_TAG:
B
Bo Zhou 已提交
191
            initialized_worker = cloudpickle.loads(message[1])
F
fuyw 已提交
192
            worker_address = initialized_worker.worker_address
B
Bo Zhou 已提交
193
            self.job_center.add_worker(initialized_worker)
F
fuyw 已提交
194 195 196
            hostname = self.job_center.get_hostname(worker_address)
            self.cluster_monitor.add_worker_status(worker_address, hostname)
            logger.info("A new worker {} is added, ".format(worker_address) +
B
Bo Zhou 已提交
197
                        "the cluster has {} CPUs.\n".format(self.cpu_num))
F
fuyw 已提交
198 199 200 201

            # a thread for sending heartbeat signals to `worker.address`
            thread = threading.Thread(
                target=self._create_worker_monitor,
F
fuyw 已提交
202
                args=(initialized_worker.worker_address, ))
F
fuyw 已提交
203 204 205 206 207 208
            thread.start()

            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

        # a client connects to the master
        elif tag == remote_constants.CLIENT_CONNECT_TAG:
209 210
            # `client_heartbeat_address` is the
            #      `reply_master_heartbeat_address` of the client
F
fuyw 已提交
211
            client_heartbeat_address = to_str(message[1])
F
fuyw 已提交
212
            client_hostname = to_str(message[2])
213
            client_id = to_str(message[3])
F
fuyw 已提交
214
            self.client_hostname[client_heartbeat_address] = client_hostname
F
fuyw 已提交
215 216 217 218 219
            logger.info(
                "Client {} is connected.".format(client_heartbeat_address))

            thread = threading.Thread(
                target=self._create_client_monitor,
B
Bo Zhou 已提交
220
                args=(client_heartbeat_address, ))
F
fuyw 已提交
221
            thread.start()
222 223 224 225 226
            log_monitor_address = "{}/logs?client_id={}".format(
                self.monitor_url, client_id)
            self.client_socket.send_multipart(
                [remote_constants.NORMAL_TAG,
                 to_byte(log_monitor_address)])
F
fuyw 已提交
227 228 229 230

        # a client submits a job to the master
        elif tag == remote_constants.CLIENT_SUBMIT_TAG:
            # check available CPU resources
B
Bo Zhou 已提交
231
            if self.cpu_num:
F
fuyw 已提交
232
                logger.info("Submitting job...")
B
Bo Zhou 已提交
233 234 235 236 237
                job = self.job_center.request_job()
                self.client_socket.send_multipart([
                    remote_constants.NORMAL_TAG,
                    to_byte(job.job_address),
                    to_byte(job.client_heartbeat_address),
F
fuyw 已提交
238
                    to_byte(job.ping_heartbeat_address),
B
Bo Zhou 已提交
239
                ])
240 241 242
                client_id = to_str(message[2])
                job_info = {job.job_id: job.log_server_address}
                self.cluster_monitor.add_client_job(client_id, job_info)
F
fuyw 已提交
243 244 245 246 247 248
                self._print_workers()
            else:
                self.client_socket.send_multipart([remote_constants.CPU_TAG])

        # a worker updates
        elif tag == remote_constants.NEW_JOB_TAG:
B
Bo Zhou 已提交
249 250
            initialized_job = cloudpickle.loads(message[1])
            last_job_address = to_str(message[2])
F
fuyw 已提交
251 252

            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
B
Bo Zhou 已提交
253 254 255
            self.job_center.update_job(last_job_address, initialized_job,
                                       initialized_job.worker_address)
            logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))
F
fuyw 已提交
256 257 258 259 260 261 262 263 264 265 266

            self._print_workers()

        # check before start a worker
        elif tag == remote_constants.NORMAL_TAG:
            self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

        else:
            raise NotImplementedError()

    def exit(self):
B
Bo Zhou 已提交
267 268
        """ Close the master.
        """
F
fuyw 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282
        self.master_is_alive = False

    def run(self):
        """An infinite loop waiting for messages from the workers and
        clients.

        Master node will receive four types of messages:

        1. A new worker connects to the master node.
        2. A connected worker sending new job address after it kills an old
           job.
        3. A new client connects to the master node.
        4. A connected client submits a job after a remote object is created.
        """
283 284 285 286
        self.client_socket.linger = 0
        self.client_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)

F
fuyw 已提交
287 288 289 290
        while self.master_is_alive:
            try:
                self._receive_message()
                pass
291 292 293
            except zmq.error.Again as e:
                #detect whether `self.master_is_alive` is True periodically
                pass
F
fuyw 已提交
294 295

        logger.warning("[Master] Exit master.")