scripts.py 9.0 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20 21
import os
import random
import re
import socket
F
fuyw 已提交
22
import subprocess
23
import sys
24
import time
F
fuyw 已提交
25 26
import threading
import warnings
F
fuyw 已提交
27
import zmq
F
fuyw 已提交
28
from multiprocessing import Process
29 30
from parl.utils import get_ip_address, to_str
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
31 32 33 34 35 36 37 38

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
locale.setlocale(locale.LC_ALL, "en_US.UTF-8")

B
Bo Zhou 已提交
39 40 41
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
42 43


F
fuyw 已提交
44 45 46 47 48
def get_free_tcp_port():
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.bind(('', 0))
    addr, port = tcp.getsockname()
    tcp.close()
49
    return str(port)
F
fuyw 已提交
50 51


B
Bo Zhou 已提交
52
def is_port_available(port):
F
fuyw 已提交
53 54
    """ Check if a port is used.

B
Bo Zhou 已提交
55
    True if the port is available for connection.
F
fuyw 已提交
56
    """
B
Bo Zhou 已提交
57 58 59 60 61
    port = int(port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    available = sock.connect_ex(('localhost', port))
    sock.close()
    return available
F
fuyw 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
87 88
@click.option(
    "--debug",
89 90
    help="Start parl in the debugging mode to print all running log.",
    is_flag=True)
F
fuyw 已提交
91 92 93 94 95
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
96 97
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
98 99 100 101
def start_master(port, cpu_num, monitor_port, debug):
    if debug:
        os.environ['DEBUG'] = 'True'

B
Bo Zhou 已提交
102
    if not is_port_available(port):
F
fuyw 已提交
103
        raise Exception(
104 105 106 107 108 109 110
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

111 112
    cpu_num = int(
        cpu_num) if cpu_num is not None else multiprocessing.cpu_count()
B
Bo Zhou 已提交
113 114
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
115
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
F
fuyw 已提交
116

117 118 119 120
    master_command = [
        sys.executable, start_file, "--name", "master", "--port", port
    ]
    worker_command = [
B
Bo Zhou 已提交
121
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
122
        "localhost:" + str(port), "--cpu_num",
F
fuyw 已提交
123 124
        str(cpu_num)
    ]
125
    monitor_command = [
F
fuyw 已提交
126 127 128 129
        sys.executable, '{}/monitor.py'.format(__file__[:__file__.rfind('/')]),
        "--monitor_port",
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
130 131 132 133 134 135

    FNULL = open(os.devnull, 'w')

    # Redirect the output to DEVNULL to solve the warning log.
    _ = subprocess.Popen(
        master_command, stdout=FNULL, stderr=subprocess.STDOUT)
136 137 138
    if cpu_num > 0:
        _ = subprocess.Popen(
            worker_command, stdout=FNULL, stderr=subprocess.STDOUT)
139 140
    _ = subprocess.Popen(
        monitor_command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
141
    FNULL.close()
F
fuyw 已提交
142

143 144 145 146 147
    if cpu_num > 0:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.

            # A local worker with {} CPUs is connected to the cluster.    
F
fuyw 已提交
148

149 150 151 152 153 154 155
            # Starting the cluster monitor...""".format(
            port,
            cpu_num,
        )
    else:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.
156

157
            # Starting the cluster monitor...""".format(port)
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
    click.echo(monitor_info)

    # check if monitor is started
    cmd = r'ps -ef | grep remote/monitor.py\ --monitor_port\ {}\ --address\ localhost:{}'.format(
        monitor_port, port)

    monitor_is_started = False
    for i in range(3):
        check_monitor_is_started = os.popen(cmd).read().strip().split('\n')
        if len(check_monitor_is_started) == 2:
            monitor_is_started = True
            break
        time.sleep(3)
    master_ip = get_ip_address()
    if monitor_is_started:
        start_info = """
174
        ## If you want to check cluster status, please view:
175

F
fuyw 已提交
176
            http://{}:{}
177 178 179

        or call:

180 181 182 183 184 185 186
            xparl status""".format(master_ip, monitor_port)
    else:
        start_info = "# Fail to start the cluster monitor."

    monitor_info = """
        {}

187
        ## If you want to add more CPU resources, please call:
188

189
            xparl connect --address {}:{}
190

191
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
192

193 194 195
            xparl stop        
        """.format(start_info, master_ip, port)
    click.echo(monitor_info)
F
fuyw 已提交
196

F
fuyw 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
def start_worker(address, cpu_num):
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
    command = [
B
Bo Zhou 已提交
213 214
        sys.executable, "{}/start.py".format(__file__[:-11]), "--name",
        "worker", "--address", address, "--cpu_num",
F
fuyw 已提交
215 216 217 218 219 220 221 222 223
        str(cpu_num)
    ]
    p = subprocess.Popen(command)


@click.command("stop", help="Exit the cluster.")
def stop():
    command = ("pkill -f remote/start.py")
    subprocess.call([command], shell=True)
B
Bo Zhou 已提交
224 225
    command = ("pkill -f remote/job.py")
    subprocess.call([command], shell=True)
F
fuyw 已提交
226 227
    command = ("pkill -f remote/monitor.py")
    subprocess.call([command], shell=True)
F
fuyw 已提交
228 229


230 231
@click.command("status")
def status():
232 233 234 235 236
    cmd = r'ps -ef | grep remote/start.py\ --name\ worker\ --address'
    content = os.popen(cmd).read().strip()
    pattern = re.compile('--address (.*?) --cpu')
    clusters = set(pattern.findall(content))
    if len(clusters) == 0:
237 238 239 240
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        for cluster in clusters:
            cmd = r'ps -ef | grep address\ {}'.format(cluster)
            content = os.popen(cmd).read()
            pattern = re.compile('--monitor_port (.*?)\n', re.S)
            monitors = pattern.findall(content)

            if len(monitors):
                monitor_port, _, master_address = monitors[0].split(' ')
                monitor_address = "{}:{}".format(get_ip_address(),
                                                 monitor_port)
                socket = ctx.socket(zmq.REQ)
                socket.setsockopt(zmq.RCVTIMEO, 10000)
                socket.connect('tcp://{}'.format(master_address))
                try:
                    socket.send_multipart([STATUS_TAG])
                    monitor_info = to_str(socket.recv_multipart()[1])
                except zmq.error.Again as e:
                    click.echo(
                        'Can not connect to cluster {}, please try later.'.
                        format(master_address))
                    socket.close(0)
                    continue
                msg = """
264 265 266
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
267 268 269 270 271 272 273 274 275
            """.format(master_address, monitor_info, monitor_address)
                status.append(msg)
                socket.close(0)
            else:
                msg = """
            # Cluster {} fails to start the cluster monitor.
                """.format(cluster)
                status.append(msg)

276 277 278 279
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
280 281 282
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
283
cli.add_command(status)
F
fuyw 已提交
284 285 286 287 288 289 290 291


def main():
    return cli()


if __name__ == "__main__":
    main()