scripts.py 12.1 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20
import os
import random
import re
21
import requests
F
fuyw 已提交
22
import subprocess
23
import sys
24
import time
F
fuyw 已提交
25
import threading
H
Hongsheng Zeng 已提交
26
import tempfile
F
fuyw 已提交
27
import warnings
F
fuyw 已提交
28
import zmq
F
fuyw 已提交
29
from multiprocessing import Process
30 31 32
from parl.utils import (_IS_WINDOWS, get_free_tcp_port, get_ip_address,
                        get_port_from_range, is_port_available, kill_process,
                        to_str)
33
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
34 35 36 37 38 39

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
H
Hongsheng Zeng 已提交
40 41 42 43 44 45

if not _IS_WINDOWS:
    try:
        locale.setlocale(locale.LC_ALL, "en_US.UTF-8")
    except:
        pass
F
fuyw 已提交
46

B
Bo Zhou 已提交
47 48 49
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
def parse_port_range(log_server_port_range):
    try:
        re.match(r'\d*[-]\d*', log_server_port_range).span()
    except:
        raise Exception(
            "The input log_server_port_range should be `start-end` format.")
    start, end = map(int, log_server_port_range.split('-'))
    if start > end:
        raise Exception(
            "Start port number must be smaller than the end port number.")

    return start, end


def is_log_server_started(ip_address, port):
    started = False
    for _ in range(3):
        try:
            r = requests.get("http://{}:{}/get-log".format(ip_address, port))
            if r.status_code == 400:
                started = True
                break
        except:
91 92 93
            pass
        time.sleep(3)
        click.echo("Checking status of log_server...")
94 95 96
    return started


F
fuyw 已提交
97 98 99 100 101 102 103
@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
104 105
@click.option(
    "--debug",
106 107
    help="Start parl in the debugging mode to print all running log.",
    is_flag=True)
F
fuyw 已提交
108 109 110 111 112
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
113 114
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
115 116 117 118 119 120 121 122 123
@click.option(
    "--log_server_port_range",
    help='''
    Port range (start-end) of the log server on the worker. Default: 8000-9000. 
    The worker will pick a random avaliable port in [start, end] for the log server.
    ''',
    default="8000-9000",
    type=str)
def start_master(port, cpu_num, monitor_port, debug, log_server_port_range):
124 125 126
    if debug:
        os.environ['DEBUG'] = 'True'

B
Bo Zhou 已提交
127
    if not is_port_available(port):
F
fuyw 已提交
128
        raise Exception(
129 130 131 132 133 134 135
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

136 137
    cpu_num = int(
        cpu_num) if cpu_num is not None else multiprocessing.cpu_count()
B
Bo Zhou 已提交
138 139
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
H
Hongsheng Zeng 已提交
140 141 142
    monitor_file = __file__.replace('scripts.pyc', 'monitor.py')
    monitor_file = monitor_file.replace('scripts.py', 'monitor.py')

143
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
144 145 146 147
    start, end = parse_port_range(log_server_port_range)
    log_server_port = get_port_from_range(start, end)
    while log_server_port == monitor_port or log_server_port == port:
        log_server_port = get_port_from_range(start, end)
F
fuyw 已提交
148

149
    master_command = [
150 151 152 153 154 155 156 157
        sys.executable,
        start_file,
        "--name",
        "master",
        "--port",
        port,
        "--monitor_port",
        monitor_port,
158 159
    ]
    worker_command = [
B
Bo Zhou 已提交
160
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
161
        "localhost:" + str(port), "--cpu_num",
162 163
        str(cpu_num), '--log_server_port',
        str(log_server_port)
F
fuyw 已提交
164
    ]
165
    monitor_command = [
H
Hongsheng Zeng 已提交
166
        sys.executable, monitor_file, "--monitor_port",
F
fuyw 已提交
167 168
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
169 170 171 172 173

    FNULL = open(os.devnull, 'w')

    # Redirect the output to DEVNULL to solve the warning log.
    _ = subprocess.Popen(
B
Bo Zhou 已提交
174
        master_command, stdout=FNULL, stderr=subprocess.STDOUT, close_fds=True)
H
Hongsheng Zeng 已提交
175

176
    if cpu_num > 0:
H
Hongsheng Zeng 已提交
177 178
        # Sleep 1s for master ready
        time.sleep(1)
179
        _ = subprocess.Popen(
B
Bo Zhou 已提交
180 181 182 183
            worker_command,
            stdout=FNULL,
            stderr=subprocess.STDOUT,
            close_fds=True)
H
Hongsheng Zeng 已提交
184 185 186 187

    if _IS_WINDOWS:
        # TODO(@zenghsh3) redirecting stdout of monitor subprocess to FNULL will cause occasional failure
        tmp_file = tempfile.TemporaryFile()
B
Bo Zhou 已提交
188
        _ = subprocess.Popen(monitor_command, stdout=tmp_file, close_fds=True)
H
Hongsheng Zeng 已提交
189 190 191
        tmp_file.close()
    else:
        _ = subprocess.Popen(
B
Bo Zhou 已提交
192 193 194 195
            monitor_command,
            stdout=FNULL,
            stderr=subprocess.STDOUT,
            close_fds=True)
B
Bo Zhou 已提交
196
    FNULL.close()
F
fuyw 已提交
197

198 199 200 201 202
    if cpu_num > 0:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.

            # A local worker with {} CPUs is connected to the cluster.    
F
fuyw 已提交
203

204 205 206 207 208 209 210
            # Starting the cluster monitor...""".format(
            port,
            cpu_num,
        )
    else:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.
211

212
            # Starting the cluster monitor...""".format(port)
213 214 215 216
    click.echo(monitor_info)

    # check if monitor is started
    monitor_is_started = False
H
Hongsheng Zeng 已提交
217 218 219 220 221 222
    if _IS_WINDOWS:
        cmd = r'''wmic process where "commandline like '%remote\\monitor.py --monitor_port {} --address localhost:{}%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''.format(
            monitor_port, port)
    else:
        cmd = r'ps -ef | grep -v grep | grep remote/monitor.py\ --monitor_port\ {}\ --address\ localhost:{}'.format(
            monitor_port, port)
223
    for i in range(3):
H
Hongsheng Zeng 已提交
224 225
        check_monitor_is_started = os.popen(cmd).read()
        if len(check_monitor_is_started) > 0:
226 227 228
            monitor_is_started = True
            break
        time.sleep(3)
H
Hongsheng Zeng 已提交
229

230 231 232
    master_ip = get_ip_address()
    if monitor_is_started:
        start_info = """
233
        ## If you want to check cluster status, please view:
234

F
fuyw 已提交
235
            http://{}:{}
236 237 238

        or call:

239 240 241 242 243 244 245
            xparl status""".format(master_ip, monitor_port)
    else:
        start_info = "# Fail to start the cluster monitor."

    monitor_info = """
        {}

246
        ## If you want to add more CPU resources, please call:
247

248
            xparl connect --address {}:{}
249

250
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
251

252 253 254
            xparl stop        
        """.format(start_info, master_ip, port)
    click.echo(monitor_info)
F
fuyw 已提交
255

256 257 258
    if not is_log_server_started(master_ip, log_server_port):
        click.echo("# Fail to start the log server.")

F
fuyw 已提交
259 260 261 262 263 264 265 266 267

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
268 269 270 271 272 273 274 275 276 277 278 279
@click.option(
    "--log_server_port_range",
    help='''
    Port range (start-end) of the log server on the worker. Default: 8000-9000. 
    The worker will pick a random avaliable port in [start, end] for the log server.
    ''',
    default="8000-9000",
    type=str)
def start_worker(address, cpu_num, log_server_port_range):
    start, end = parse_port_range(log_server_port_range)
    log_server_port = get_port_from_range(start, end)

F
fuyw 已提交
280 281 282 283 284
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
H
Hongsheng Zeng 已提交
285 286 287
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')

F
fuyw 已提交
288
    command = [
H
Hongsheng Zeng 已提交
289 290
        sys.executable, start_file, "--name", "worker", "--address", address,
        "--cpu_num",
291 292
        str(cpu_num), "--log_server_port",
        str(log_server_port)
F
fuyw 已提交
293
    ]
B
Bo Zhou 已提交
294
    p = subprocess.Popen(command, close_fds=True)
F
fuyw 已提交
295

296 297 298
    if not is_log_server_started(get_ip_address(), log_server_port):
        click.echo("# Fail to start the log server.")

F
fuyw 已提交
299 300 301

@click.command("stop", help="Exit the cluster.")
def stop():
302 303 304
    kill_process('remote/start.py')
    kill_process('remote/job.py')
    kill_process('remote/monitor.py')
305
    kill_process('remote/log_server.py')
F
fuyw 已提交
306 307


308 309
@click.command("status")
def status():
H
Hongsheng Zeng 已提交
310 311 312 313 314
    if _IS_WINDOWS:
        cmd = r'''wmic process where "commandline like '%remote\\start.py --name worker --address%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''
    else:
        cmd = r'ps -ef | grep remote/start.py\ --name\ worker\ --address'

315 316 317 318
    content = os.popen(cmd).read().strip()
    pattern = re.compile('--address (.*?) --cpu')
    clusters = set(pattern.findall(content))
    if len(clusters) == 0:
319 320 321 322
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
323
        for cluster in clusters:
H
Hongsheng Zeng 已提交
324 325 326 327 328
            if _IS_WINDOWS:
                cmd = r'''wmic process where "commandline like '%address {}%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''.format(
                    cluster)
            else:
                cmd = r'ps -ef | grep address\ {}'.format(cluster)
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
            content = os.popen(cmd).read()
            pattern = re.compile('--monitor_port (.*?)\n', re.S)
            monitors = pattern.findall(content)

            if len(monitors):
                monitor_port, _, master_address = monitors[0].split(' ')
                monitor_address = "{}:{}".format(get_ip_address(),
                                                 monitor_port)
                socket = ctx.socket(zmq.REQ)
                socket.setsockopt(zmq.RCVTIMEO, 10000)
                socket.connect('tcp://{}'.format(master_address))
                try:
                    socket.send_multipart([STATUS_TAG])
                    monitor_info = to_str(socket.recv_multipart()[1])
                except zmq.error.Again as e:
                    click.echo(
                        'Can not connect to cluster {}, please try later.'.
                        format(master_address))
                    socket.close(0)
                    continue
                msg = """
350 351 352
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
353 354 355 356 357 358 359 360 361
            """.format(master_address, monitor_info, monitor_address)
                status.append(msg)
                socket.close(0)
            else:
                msg = """
            # Cluster {} fails to start the cluster monitor.
                """.format(cluster)
                status.append(msg)

362 363 364 365
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
366 367 368
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
369
cli.add_command(status)
F
fuyw 已提交
370 371 372 373 374 375 376 377


def main():
    return cli()


if __name__ == "__main__":
    main()