scripts.py 9.1 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20 21
import os
import random
import re
import socket
F
fuyw 已提交
22
import subprocess
23
import sys
24
import time
F
fuyw 已提交
25 26
import threading
import warnings
F
fuyw 已提交
27
import zmq
F
fuyw 已提交
28
from multiprocessing import Process
29 30
from parl.utils import get_ip_address, to_str
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
31 32 33 34 35 36

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
H
Hongsheng Zeng 已提交
37 38 39 40
try:
    locale.setlocale(locale.LC_ALL, "en_US.UTF-8")
except:
    pass
F
fuyw 已提交
41

B
Bo Zhou 已提交
42 43 44
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
45 46


F
fuyw 已提交
47 48 49 50 51
def get_free_tcp_port():
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.bind(('', 0))
    addr, port = tcp.getsockname()
    tcp.close()
52
    return str(port)
F
fuyw 已提交
53 54


B
Bo Zhou 已提交
55
def is_port_available(port):
F
fuyw 已提交
56 57
    """ Check if a port is used.

B
Bo Zhou 已提交
58
    True if the port is available for connection.
F
fuyw 已提交
59
    """
B
Bo Zhou 已提交
60 61 62 63 64
    port = int(port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    available = sock.connect_ex(('localhost', port))
    sock.close()
    return available
F
fuyw 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
90 91
@click.option(
    "--debug",
92 93
    help="Start parl in the debugging mode to print all running log.",
    is_flag=True)
F
fuyw 已提交
94 95 96 97 98
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
99 100
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
101 102 103 104
def start_master(port, cpu_num, monitor_port, debug):
    if debug:
        os.environ['DEBUG'] = 'True'

B
Bo Zhou 已提交
105
    if not is_port_available(port):
F
fuyw 已提交
106
        raise Exception(
107 108 109 110 111 112 113
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

114 115
    cpu_num = int(
        cpu_num) if cpu_num is not None else multiprocessing.cpu_count()
B
Bo Zhou 已提交
116 117
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
118
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
F
fuyw 已提交
119

120 121 122 123
    master_command = [
        sys.executable, start_file, "--name", "master", "--port", port
    ]
    worker_command = [
B
Bo Zhou 已提交
124
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
125
        "localhost:" + str(port), "--cpu_num",
F
fuyw 已提交
126 127
        str(cpu_num)
    ]
128
    monitor_command = [
F
fuyw 已提交
129 130 131 132
        sys.executable, '{}/monitor.py'.format(__file__[:__file__.rfind('/')]),
        "--monitor_port",
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
133 134 135 136 137 138

    FNULL = open(os.devnull, 'w')

    # Redirect the output to DEVNULL to solve the warning log.
    _ = subprocess.Popen(
        master_command, stdout=FNULL, stderr=subprocess.STDOUT)
139 140 141
    if cpu_num > 0:
        _ = subprocess.Popen(
            worker_command, stdout=FNULL, stderr=subprocess.STDOUT)
142 143
    _ = subprocess.Popen(
        monitor_command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
144
    FNULL.close()
F
fuyw 已提交
145

146 147 148 149 150
    if cpu_num > 0:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.

            # A local worker with {} CPUs is connected to the cluster.    
F
fuyw 已提交
151

152 153 154 155 156 157 158
            # Starting the cluster monitor...""".format(
            port,
            cpu_num,
        )
    else:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.
159

160
            # Starting the cluster monitor...""".format(port)
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
    click.echo(monitor_info)

    # check if monitor is started
    cmd = r'ps -ef | grep remote/monitor.py\ --monitor_port\ {}\ --address\ localhost:{}'.format(
        monitor_port, port)

    monitor_is_started = False
    for i in range(3):
        check_monitor_is_started = os.popen(cmd).read().strip().split('\n')
        if len(check_monitor_is_started) == 2:
            monitor_is_started = True
            break
        time.sleep(3)
    master_ip = get_ip_address()
    if monitor_is_started:
        start_info = """
177
        ## If you want to check cluster status, please view:
178

F
fuyw 已提交
179
            http://{}:{}
180 181 182

        or call:

183 184 185 186 187 188 189
            xparl status""".format(master_ip, monitor_port)
    else:
        start_info = "# Fail to start the cluster monitor."

    monitor_info = """
        {}

190
        ## If you want to add more CPU resources, please call:
191

192
            xparl connect --address {}:{}
193

194
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
195

196 197 198
            xparl stop        
        """.format(start_info, master_ip, port)
    click.echo(monitor_info)
F
fuyw 已提交
199

F
fuyw 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
def start_worker(address, cpu_num):
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
    command = [
B
Bo Zhou 已提交
216 217
        sys.executable, "{}/start.py".format(__file__[:-11]), "--name",
        "worker", "--address", address, "--cpu_num",
F
fuyw 已提交
218 219 220 221 222 223 224
        str(cpu_num)
    ]
    p = subprocess.Popen(command)


@click.command("stop", help="Exit the cluster.")
def stop():
L
LI Yunxiang 已提交
225 226
    command = (
        "ps aux | grep remote/start.py | awk '{print $2}' | xargs kill -9")
F
fuyw 已提交
227
    subprocess.call([command], shell=True)
L
LI Yunxiang 已提交
228 229
    command = (
        "ps aux | grep remote/job.py | awk '{print $2}' | xargs kill -9")
B
Bo Zhou 已提交
230
    subprocess.call([command], shell=True)
L
LI Yunxiang 已提交
231 232
    command = (
        "ps aux | grep remote/monitor.py | awk '{print $2}' | xargs kill -9")
F
fuyw 已提交
233
    subprocess.call([command], shell=True)
F
fuyw 已提交
234 235


236 237
@click.command("status")
def status():
238 239 240 241 242
    cmd = r'ps -ef | grep remote/start.py\ --name\ worker\ --address'
    content = os.popen(cmd).read().strip()
    pattern = re.compile('--address (.*?) --cpu')
    clusters = set(pattern.findall(content))
    if len(clusters) == 0:
243 244 245 246
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
        for cluster in clusters:
            cmd = r'ps -ef | grep address\ {}'.format(cluster)
            content = os.popen(cmd).read()
            pattern = re.compile('--monitor_port (.*?)\n', re.S)
            monitors = pattern.findall(content)

            if len(monitors):
                monitor_port, _, master_address = monitors[0].split(' ')
                monitor_address = "{}:{}".format(get_ip_address(),
                                                 monitor_port)
                socket = ctx.socket(zmq.REQ)
                socket.setsockopt(zmq.RCVTIMEO, 10000)
                socket.connect('tcp://{}'.format(master_address))
                try:
                    socket.send_multipart([STATUS_TAG])
                    monitor_info = to_str(socket.recv_multipart()[1])
                except zmq.error.Again as e:
                    click.echo(
                        'Can not connect to cluster {}, please try later.'.
                        format(master_address))
                    socket.close(0)
                    continue
                msg = """
270 271 272
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
273 274 275 276 277 278 279 280 281
            """.format(master_address, monitor_info, monitor_address)
                status.append(msg)
                socket.close(0)
            else:
                msg = """
            # Cluster {} fails to start the cluster monitor.
                """.format(cluster)
                status.append(msg)

282 283 284 285
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
286 287 288
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
289
cli.add_command(status)
F
fuyw 已提交
290 291 292 293 294 295 296 297


def main():
    return cli()


if __name__ == "__main__":
    main()