scripts.py 8.7 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20 21
import os
import random
import re
import socket
F
fuyw 已提交
22
import subprocess
23
import sys
24
import time
F
fuyw 已提交
25 26
import threading
import warnings
F
fuyw 已提交
27
import zmq
F
fuyw 已提交
28
from multiprocessing import Process
29 30
from parl.utils import get_ip_address, to_str
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
31 32 33 34 35 36 37 38

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
locale.setlocale(locale.LC_ALL, "en_US.UTF-8")

B
Bo Zhou 已提交
39 40 41
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
42 43


F
fuyw 已提交
44 45 46 47 48
def get_free_tcp_port():
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.bind(('', 0))
    addr, port = tcp.getsockname()
    tcp.close()
49
    return str(port)
F
fuyw 已提交
50 51


B
Bo Zhou 已提交
52
def is_port_available(port):
F
fuyw 已提交
53 54
    """ Check if a port is used.

B
Bo Zhou 已提交
55
    True if the port is available for connection.
F
fuyw 已提交
56
    """
B
Bo Zhou 已提交
57 58 59 60 61
    port = int(port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    available = sock.connect_ex(('localhost', port))
    sock.close()
    return available
F
fuyw 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
87 88 89 90
@click.option(
    "--debug",
    help="Start parl in debug mode to show all logs.",
    default=False)
F
fuyw 已提交
91 92 93 94 95
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
96 97
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
98 99 100 101
def start_master(port, cpu_num, monitor_port, debug):
    if debug:
        os.environ['DEBUG'] = 'True'

B
Bo Zhou 已提交
102
    if not is_port_available(port):
F
fuyw 已提交
103
        raise Exception(
104 105 106 107 108 109 110
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

F
fuyw 已提交
111
    cpu_num = cpu_num if cpu_num else multiprocessing.cpu_count()
B
Bo Zhou 已提交
112 113
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
114
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
F
fuyw 已提交
115

116 117 118 119
    master_command = [
        sys.executable, start_file, "--name", "master", "--port", port
    ]
    worker_command = [
B
Bo Zhou 已提交
120
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
121
        "localhost:" + str(port), "--cpu_num",
F
fuyw 已提交
122 123
        str(cpu_num)
    ]
124
    monitor_command = [
F
fuyw 已提交
125 126 127 128
        sys.executable, '{}/monitor.py'.format(__file__[:__file__.rfind('/')]),
        "--monitor_port",
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
129 130 131 132 133 134 135 136 137 138

    FNULL = open(os.devnull, 'w')

    # Redirect the output to DEVNULL to solve the warning log.
    _ = subprocess.Popen(
        master_command, stdout=FNULL, stderr=subprocess.STDOUT)
    _ = subprocess.Popen(
        worker_command, stdout=FNULL, stderr=subprocess.STDOUT)
    _ = subprocess.Popen(
        monitor_command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
139
    FNULL.close()
F
fuyw 已提交
140

141
    monitor_info = """
F
fuyw 已提交
142 143
        # The Parl cluster is started at localhost:{}.

144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
        # A local worker with {} CPUs is connected to the cluster.    

        # Starting the cluster monitor...""".format(
        port,
        cpu_num,
    )
    click.echo(monitor_info)

    # check if monitor is started
    cmd = r'ps -ef | grep remote/monitor.py\ --monitor_port\ {}\ --address\ localhost:{}'.format(
        monitor_port, port)

    monitor_is_started = False
    for i in range(3):
        check_monitor_is_started = os.popen(cmd).read().strip().split('\n')
        if len(check_monitor_is_started) == 2:
            monitor_is_started = True
            break
        time.sleep(3)
    master_ip = get_ip_address()
    if monitor_is_started:
        start_info = """
166
        ## If you want to check cluster status, please view:
167

F
fuyw 已提交
168
            http://{}:{}
169 170 171

        or call:

172 173 174 175 176 177 178
            xparl status""".format(master_ip, monitor_port)
    else:
        start_info = "# Fail to start the cluster monitor."

    monitor_info = """
        {}

179
        ## If you want to add more CPU resources, please call:
180

181
            xparl connect --address {}:{}
182

183
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
184

185 186 187
            xparl stop        
        """.format(start_info, master_ip, port)
    click.echo(monitor_info)
F
fuyw 已提交
188

F
fuyw 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
def start_worker(address, cpu_num):
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
    command = [
B
Bo Zhou 已提交
205 206
        sys.executable, "{}/start.py".format(__file__[:-11]), "--name",
        "worker", "--address", address, "--cpu_num",
F
fuyw 已提交
207 208 209 210 211 212 213 214 215
        str(cpu_num)
    ]
    p = subprocess.Popen(command)


@click.command("stop", help="Exit the cluster.")
def stop():
    command = ("pkill -f remote/start.py")
    subprocess.call([command], shell=True)
B
Bo Zhou 已提交
216 217
    command = ("pkill -f remote/job.py")
    subprocess.call([command], shell=True)
F
fuyw 已提交
218 219
    command = ("pkill -f remote/monitor.py")
    subprocess.call([command], shell=True)
F
fuyw 已提交
220 221


222 223
@click.command("status")
def status():
224 225 226 227 228
    cmd = r'ps -ef | grep remote/start.py\ --name\ worker\ --address'
    content = os.popen(cmd).read().strip()
    pattern = re.compile('--address (.*?) --cpu')
    clusters = set(pattern.findall(content))
    if len(clusters) == 0:
229 230 231 232
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
        for cluster in clusters:
            cmd = r'ps -ef | grep address\ {}'.format(cluster)
            content = os.popen(cmd).read()
            pattern = re.compile('--monitor_port (.*?)\n', re.S)
            monitors = pattern.findall(content)

            if len(monitors):
                monitor_port, _, master_address = monitors[0].split(' ')
                monitor_address = "{}:{}".format(get_ip_address(),
                                                 monitor_port)
                socket = ctx.socket(zmq.REQ)
                socket.setsockopt(zmq.RCVTIMEO, 10000)
                socket.connect('tcp://{}'.format(master_address))
                try:
                    socket.send_multipart([STATUS_TAG])
                    monitor_info = to_str(socket.recv_multipart()[1])
                except zmq.error.Again as e:
                    click.echo(
                        'Can not connect to cluster {}, please try later.'.
                        format(master_address))
                    socket.close(0)
                    continue
                msg = """
256 257 258
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
259 260 261 262 263 264 265 266 267
            """.format(master_address, monitor_info, monitor_address)
                status.append(msg)
                socket.close(0)
            else:
                msg = """
            # Cluster {} fails to start the cluster monitor.
                """.format(cluster)
                status.append(msg)

268 269 270 271
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
272 273 274
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
275
cli.add_command(status)
F
fuyw 已提交
276 277 278 279 280 281 282 283


def main():
    return cli()


if __name__ == "__main__":
    main()