scripts.py 11.9 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20
import os
import random
import re
21
import requests
F
fuyw 已提交
22
import subprocess
23
import sys
24
import time
F
fuyw 已提交
25
import threading
H
Hongsheng Zeng 已提交
26
import tempfile
F
fuyw 已提交
27
import warnings
F
fuyw 已提交
28
import zmq
F
fuyw 已提交
29
from multiprocessing import Process
30 31 32
from parl.utils import (_IS_WINDOWS, get_free_tcp_port, get_ip_address,
                        get_port_from_range, is_port_available, kill_process,
                        to_str)
33
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
34 35 36 37 38 39

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
H
Hongsheng Zeng 已提交
40 41 42 43 44 45

if not _IS_WINDOWS:
    try:
        locale.setlocale(locale.LC_ALL, "en_US.UTF-8")
    except:
        pass
F
fuyw 已提交
46

B
Bo Zhou 已提交
47 48 49
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
def parse_port_range(log_server_port_range):
    try:
        re.match(r'\d*[-]\d*', log_server_port_range).span()
    except:
        raise Exception(
            "The input log_server_port_range should be `start-end` format.")
    start, end = map(int, log_server_port_range.split('-'))
    if start > end:
        raise Exception(
            "Start port number must be smaller than the end port number.")

    return start, end


def is_log_server_started(ip_address, port):
    started = False
    for _ in range(3):
        try:
            r = requests.get("http://{}:{}/get-log".format(ip_address, port))
            if r.status_code == 400:
                started = True
                break
        except:
            time.sleep(3)
    return started


F
fuyw 已提交
95 96 97 98 99 100 101
@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
102 103
@click.option(
    "--debug",
104 105
    help="Start parl in the debugging mode to print all running log.",
    is_flag=True)
F
fuyw 已提交
106 107 108 109 110
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
111 112
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
113 114 115 116 117 118 119 120 121
@click.option(
    "--log_server_port_range",
    help='''
    Port range (start-end) of the log server on the worker. Default: 8000-9000. 
    The worker will pick a random avaliable port in [start, end] for the log server.
    ''',
    default="8000-9000",
    type=str)
def start_master(port, cpu_num, monitor_port, debug, log_server_port_range):
122 123 124
    if debug:
        os.environ['DEBUG'] = 'True'

B
Bo Zhou 已提交
125
    if not is_port_available(port):
F
fuyw 已提交
126
        raise Exception(
127 128 129 130 131 132 133
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

134 135
    cpu_num = int(
        cpu_num) if cpu_num is not None else multiprocessing.cpu_count()
B
Bo Zhou 已提交
136 137
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
H
Hongsheng Zeng 已提交
138 139 140
    monitor_file = __file__.replace('scripts.pyc', 'monitor.py')
    monitor_file = monitor_file.replace('scripts.py', 'monitor.py')

141
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
142 143 144 145
    start, end = parse_port_range(log_server_port_range)
    log_server_port = get_port_from_range(start, end)
    while log_server_port == monitor_port or log_server_port == port:
        log_server_port = get_port_from_range(start, end)
F
fuyw 已提交
146

147
    master_command = [
148 149 150 151 152 153 154 155
        sys.executable,
        start_file,
        "--name",
        "master",
        "--port",
        port,
        "--monitor_port",
        monitor_port,
156 157
    ]
    worker_command = [
B
Bo Zhou 已提交
158
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
159
        "localhost:" + str(port), "--cpu_num",
160 161
        str(cpu_num), '--log_server_port',
        str(log_server_port)
F
fuyw 已提交
162
    ]
163
    monitor_command = [
H
Hongsheng Zeng 已提交
164
        sys.executable, monitor_file, "--monitor_port",
F
fuyw 已提交
165 166
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
167 168 169 170 171 172

    FNULL = open(os.devnull, 'w')

    # Redirect the output to DEVNULL to solve the warning log.
    _ = subprocess.Popen(
        master_command, stdout=FNULL, stderr=subprocess.STDOUT)
H
Hongsheng Zeng 已提交
173

174
    if cpu_num > 0:
H
Hongsheng Zeng 已提交
175 176
        # Sleep 1s for master ready
        time.sleep(1)
177 178
        _ = subprocess.Popen(
            worker_command, stdout=FNULL, stderr=subprocess.STDOUT)
H
Hongsheng Zeng 已提交
179 180 181 182 183 184 185 186 187

    if _IS_WINDOWS:
        # TODO(@zenghsh3) redirecting stdout of monitor subprocess to FNULL will cause occasional failure
        tmp_file = tempfile.TemporaryFile()
        _ = subprocess.Popen(monitor_command, stdout=tmp_file)
        tmp_file.close()
    else:
        _ = subprocess.Popen(
            monitor_command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
188
    FNULL.close()
F
fuyw 已提交
189

190 191 192 193 194
    if cpu_num > 0:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.

            # A local worker with {} CPUs is connected to the cluster.    
F
fuyw 已提交
195

196 197 198 199 200 201 202
            # Starting the cluster monitor...""".format(
            port,
            cpu_num,
        )
    else:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.
203

204
            # Starting the cluster monitor...""".format(port)
205 206 207 208
    click.echo(monitor_info)

    # check if monitor is started
    monitor_is_started = False
H
Hongsheng Zeng 已提交
209 210 211 212 213 214
    if _IS_WINDOWS:
        cmd = r'''wmic process where "commandline like '%remote\\monitor.py --monitor_port {} --address localhost:{}%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''.format(
            monitor_port, port)
    else:
        cmd = r'ps -ef | grep -v grep | grep remote/monitor.py\ --monitor_port\ {}\ --address\ localhost:{}'.format(
            monitor_port, port)
215
    for i in range(3):
H
Hongsheng Zeng 已提交
216 217
        check_monitor_is_started = os.popen(cmd).read()
        if len(check_monitor_is_started) > 0:
218 219 220
            monitor_is_started = True
            break
        time.sleep(3)
H
Hongsheng Zeng 已提交
221

222 223 224
    master_ip = get_ip_address()
    if monitor_is_started:
        start_info = """
225
        ## If you want to check cluster status, please view:
226

F
fuyw 已提交
227
            http://{}:{}
228 229 230

        or call:

231 232 233 234 235 236 237
            xparl status""".format(master_ip, monitor_port)
    else:
        start_info = "# Fail to start the cluster monitor."

    monitor_info = """
        {}

238
        ## If you want to add more CPU resources, please call:
239

240
            xparl connect --address {}:{}
241

242
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
243

244 245 246
            xparl stop        
        """.format(start_info, master_ip, port)
    click.echo(monitor_info)
F
fuyw 已提交
247

248 249 250
    if not is_log_server_started(master_ip, log_server_port):
        click.echo("# Fail to start the log server.")

F
fuyw 已提交
251 252 253 254 255 256 257 258 259

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
260 261 262 263 264 265 266 267 268 269 270 271
@click.option(
    "--log_server_port_range",
    help='''
    Port range (start-end) of the log server on the worker. Default: 8000-9000. 
    The worker will pick a random avaliable port in [start, end] for the log server.
    ''',
    default="8000-9000",
    type=str)
def start_worker(address, cpu_num, log_server_port_range):
    start, end = parse_port_range(log_server_port_range)
    log_server_port = get_port_from_range(start, end)

F
fuyw 已提交
272 273 274 275 276
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
H
Hongsheng Zeng 已提交
277 278 279
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')

F
fuyw 已提交
280
    command = [
H
Hongsheng Zeng 已提交
281 282
        sys.executable, start_file, "--name", "worker", "--address", address,
        "--cpu_num",
283 284
        str(cpu_num), "--log_server_port",
        str(log_server_port)
F
fuyw 已提交
285 286 287
    ]
    p = subprocess.Popen(command)

288 289 290
    if not is_log_server_started(get_ip_address(), log_server_port):
        click.echo("# Fail to start the log server.")

F
fuyw 已提交
291 292 293

@click.command("stop", help="Exit the cluster.")
def stop():
294 295 296
    kill_process('remote/start.py')
    kill_process('remote/job.py')
    kill_process('remote/monitor.py')
297
    kill_process('remote/log_server.py')
F
fuyw 已提交
298 299


300 301
@click.command("status")
def status():
H
Hongsheng Zeng 已提交
302 303 304 305 306
    if _IS_WINDOWS:
        cmd = r'''wmic process where "commandline like '%remote\\start.py --name worker --address%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''
    else:
        cmd = r'ps -ef | grep remote/start.py\ --name\ worker\ --address'

307 308 309 310
    content = os.popen(cmd).read().strip()
    pattern = re.compile('--address (.*?) --cpu')
    clusters = set(pattern.findall(content))
    if len(clusters) == 0:
311 312 313 314
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
315
        for cluster in clusters:
H
Hongsheng Zeng 已提交
316 317 318 319 320
            if _IS_WINDOWS:
                cmd = r'''wmic process where "commandline like '%address {}%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''.format(
                    cluster)
            else:
                cmd = r'ps -ef | grep address\ {}'.format(cluster)
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
            content = os.popen(cmd).read()
            pattern = re.compile('--monitor_port (.*?)\n', re.S)
            monitors = pattern.findall(content)

            if len(monitors):
                monitor_port, _, master_address = monitors[0].split(' ')
                monitor_address = "{}:{}".format(get_ip_address(),
                                                 monitor_port)
                socket = ctx.socket(zmq.REQ)
                socket.setsockopt(zmq.RCVTIMEO, 10000)
                socket.connect('tcp://{}'.format(master_address))
                try:
                    socket.send_multipart([STATUS_TAG])
                    monitor_info = to_str(socket.recv_multipart()[1])
                except zmq.error.Again as e:
                    click.echo(
                        'Can not connect to cluster {}, please try later.'.
                        format(master_address))
                    socket.close(0)
                    continue
                msg = """
342 343 344
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
345 346 347 348 349 350 351 352 353
            """.format(master_address, monitor_info, monitor_address)
                status.append(msg)
                socket.close(0)
            else:
                msg = """
            # Cluster {} fails to start the cluster monitor.
                """.format(cluster)
                status.append(msg)

354 355 356 357
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
358 359 360
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
361
cli.add_command(status)
F
fuyw 已提交
362 363 364 365 366 367 368 369


def main():
    return cli()


if __name__ == "__main__":
    main()