scripts.py 6.9 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20 21
import os
import random
import re
import socket
F
fuyw 已提交
22
import subprocess
23
import sys
F
fuyw 已提交
24 25
import threading
import warnings
F
fuyw 已提交
26
import zmq
F
fuyw 已提交
27
from multiprocessing import Process
28 29
from parl.utils import get_ip_address, to_str
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
30 31 32 33 34 35 36 37

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
locale.setlocale(locale.LC_ALL, "en_US.UTF-8")

B
Bo Zhou 已提交
38 39 40
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
41 42


F
fuyw 已提交
43 44 45 46 47
def get_free_tcp_port():
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.bind(('', 0))
    addr, port = tcp.getsockname()
    tcp.close()
48
    return str(port)
F
fuyw 已提交
49 50


B
Bo Zhou 已提交
51
def is_port_available(port):
F
fuyw 已提交
52 53
    """ Check if a port is used.

B
Bo Zhou 已提交
54
    True if the port is available for connection.
F
fuyw 已提交
55
    """
B
Bo Zhou 已提交
56 57 58 59 60
    port = int(port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    available = sock.connect_ex(('localhost', port))
    sock.close()
    return available
F
fuyw 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
91 92 93
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
def start_master(port, cpu_num, monitor_port):
B
Bo Zhou 已提交
94
    if not is_port_available(port):
F
fuyw 已提交
95
        raise Exception(
96 97 98 99 100 101 102
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

F
fuyw 已提交
103
    cpu_num = cpu_num if cpu_num else multiprocessing.cpu_count()
B
Bo Zhou 已提交
104 105
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
B
Bo Zhou 已提交
106
    command = [sys.executable, start_file, "--name", "master", "--port", port]
F
fuyw 已提交
107

F
fuyw 已提交
108
    p = subprocess.Popen(command)
F
fuyw 已提交
109
    command = [
B
Bo Zhou 已提交
110
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
111
        "localhost:" + str(port), "--cpu_num",
F
fuyw 已提交
112 113
        str(cpu_num)
    ]
B
Bo Zhou 已提交
114 115 116
    # Redirect the output to DEVNULL to solve the warning log.
    FNULL = open(os.devnull, 'w')
    p = subprocess.Popen(command, stdout=FNULL, stderr=subprocess.STDOUT)
F
fuyw 已提交
117

118
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
F
fuyw 已提交
119 120 121 122 123 124 125

    command = [
        sys.executable, '{}/monitor.py'.format(__file__[:__file__.rfind('/')]),
        "--monitor_port",
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
    p = subprocess.Popen(command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
126
    FNULL.close()
F
fuyw 已提交
127

128
    master_ip = get_ip_address()
F
fuyw 已提交
129 130 131 132 133
    cluster_info = """
        # The Parl cluster is started at localhost:{}.

        # A local worker with {} CPUs is connected to the cluster.
        
134
        ## If you want to check cluster status, please view:
F
fuyw 已提交
135 136
        
            http://{}:{}.
137 138 139 140

        or call:

            xparl status
F
fuyw 已提交
141
        
142
        ## If you want to add more CPU resources, please call:
F
fuyw 已提交
143
        
144
            xparl connect --address {}:{}
F
fuyw 已提交
145
        
146
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
147
            
148 149
            xparl stop
        """.format(port, cpu_num, master_ip, monitor_port, master_ip, port)
F
fuyw 已提交
150 151 152

    click.echo(cluster_info)

F
fuyw 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
def start_worker(address, cpu_num):
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
    command = [
B
Bo Zhou 已提交
169 170
        sys.executable, "{}/start.py".format(__file__[:-11]), "--name",
        "worker", "--address", address, "--cpu_num",
F
fuyw 已提交
171 172 173 174 175 176 177 178 179
        str(cpu_num)
    ]
    p = subprocess.Popen(command)


@click.command("stop", help="Exit the cluster.")
def stop():
    command = ("pkill -f remote/start.py")
    subprocess.call([command], shell=True)
B
Bo Zhou 已提交
180 181
    command = ("pkill -f remote/job.py")
    subprocess.call([command], shell=True)
F
fuyw 已提交
182 183
    command = ("pkill -f remote/monitor.py")
    subprocess.call([command], shell=True)
F
fuyw 已提交
184 185


186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
@click.command("status")
def status():
    cmd = r'ps -ef | grep remote/monitor.py\ --monitor_port'
    content = os.popen(cmd).read()
    pattern = re.compile('--monitor_port (.*?)\n', re.S)
    monitors = pattern.findall(content)
    if len(monitors) == 0:
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
        for monitor in monitors:
            monitor_port, _, master_address = monitor.split(' ')
            master_ip = master_address.split(':')[0]
            monitor_address = "{}:{}".format(master_ip, monitor_port)
            socket = ctx.socket(zmq.REQ)
            socket.connect('tcp://{}'.format(master_address))
            socket.send_multipart([STATUS_TAG])
            cluster_info = to_str(socket.recv_multipart()[1])
            msg = """
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
            """.format(master_address, cluster_info, monitor_address)
            status.append(msg)
            socket.close(0)
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
216 217 218
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
219
cli.add_command(status)
F
fuyw 已提交
220 221 222 223 224 225 226 227


def main():
    return cli()


if __name__ == "__main__":
    main()