scripts.py 10.1 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import locale
F
fuyw 已提交
17
import multiprocessing
18 19 20 21
import os
import random
import re
import socket
F
fuyw 已提交
22
import subprocess
23
import sys
24
import time
F
fuyw 已提交
25
import threading
H
Hongsheng Zeng 已提交
26
import tempfile
F
fuyw 已提交
27
import warnings
F
fuyw 已提交
28
import zmq
F
fuyw 已提交
29
from multiprocessing import Process
30
from parl.utils import get_ip_address, to_str, _IS_WINDOWS, kill_process
31
from parl.remote.remote_constants import STATUS_TAG
F
fuyw 已提交
32 33 34 35 36 37

# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'

# Solve `Click will abort further execution because Python 3 was configured
# to use ASCII as encoding for the environment` error.
H
Hongsheng Zeng 已提交
38 39 40 41 42 43

if not _IS_WINDOWS:
    try:
        locale.setlocale(locale.LC_ALL, "en_US.UTF-8")
    except:
        pass
F
fuyw 已提交
44

B
Bo Zhou 已提交
45 46 47
#TODO: this line will cause error in python2/macOS
if sys.version_info.major == 3:
    warnings.simplefilter("ignore", ResourceWarning)
F
fuyw 已提交
48 49


F
fuyw 已提交
50 51 52 53 54
def get_free_tcp_port():
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.bind(('', 0))
    addr, port = tcp.getsockname()
    tcp.close()
55
    return str(port)
F
fuyw 已提交
56 57


B
Bo Zhou 已提交
58
def is_port_available(port):
F
fuyw 已提交
59 60
    """ Check if a port is used.

B
Bo Zhou 已提交
61
    True if the port is available for connection.
F
fuyw 已提交
62
    """
B
Bo Zhou 已提交
63 64 65 66 67
    port = int(port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    available = sock.connect_ex(('localhost', port))
    sock.close()
    return available
F
fuyw 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92


def is_master_started(address):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.REQ)
    socket.linger = 0
    socket.setsockopt(zmq.RCVTIMEO, 500)
    socket.connect("tcp://{}".format(address))
    socket.send_multipart([b'[NORMAL]'])
    try:
        _ = socket.recv_multipart()
        socket.close(0)
        return True
    except zmq.error.Again as e:
        socket.close(0)
        return False


@click.group()
def cli():
    pass


@click.command("start", short_help="Start a master node.")
@click.option("--port", help="The port to bind to.", type=str, required=True)
93 94
@click.option(
    "--debug",
95 96
    help="Start parl in the debugging mode to print all running log.",
    is_flag=True)
F
fuyw 已提交
97 98 99 100 101
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
102 103
@click.option(
    "--monitor_port", help="The port to start a cluster monitor.", type=str)
104 105 106 107
def start_master(port, cpu_num, monitor_port, debug):
    if debug:
        os.environ['DEBUG'] = 'True'

B
Bo Zhou 已提交
108
    if not is_port_available(port):
F
fuyw 已提交
109
        raise Exception(
110 111 112 113 114 115 116
            "The master address localhost:{} is already in use.".format(port))

    if monitor_port and not is_port_available(monitor_port):
        raise Exception(
            "The input monitor port localhost:{} is already in use.".format(
                monitor_port))

117 118
    cpu_num = int(
        cpu_num) if cpu_num is not None else multiprocessing.cpu_count()
B
Bo Zhou 已提交
119 120
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')
H
Hongsheng Zeng 已提交
121 122 123
    monitor_file = __file__.replace('scripts.pyc', 'monitor.py')
    monitor_file = monitor_file.replace('scripts.py', 'monitor.py')

124
    monitor_port = monitor_port if monitor_port else get_free_tcp_port()
F
fuyw 已提交
125

126 127 128 129
    master_command = [
        sys.executable, start_file, "--name", "master", "--port", port
    ]
    worker_command = [
B
Bo Zhou 已提交
130
        sys.executable, start_file, "--name", "worker", "--address",
B
Bo Zhou 已提交
131
        "localhost:" + str(port), "--cpu_num",
F
fuyw 已提交
132 133
        str(cpu_num)
    ]
134
    monitor_command = [
H
Hongsheng Zeng 已提交
135
        sys.executable, monitor_file, "--monitor_port",
F
fuyw 已提交
136 137
        str(monitor_port), "--address", "localhost:" + str(port)
    ]
138 139 140 141 142 143

    FNULL = open(os.devnull, 'w')

    # Redirect the output to DEVNULL to solve the warning log.
    _ = subprocess.Popen(
        master_command, stdout=FNULL, stderr=subprocess.STDOUT)
H
Hongsheng Zeng 已提交
144

145
    if cpu_num > 0:
H
Hongsheng Zeng 已提交
146 147
        # Sleep 1s for master ready
        time.sleep(1)
148 149
        _ = subprocess.Popen(
            worker_command, stdout=FNULL, stderr=subprocess.STDOUT)
H
Hongsheng Zeng 已提交
150 151 152 153 154 155 156 157 158

    if _IS_WINDOWS:
        # TODO(@zenghsh3) redirecting stdout of monitor subprocess to FNULL will cause occasional failure
        tmp_file = tempfile.TemporaryFile()
        _ = subprocess.Popen(monitor_command, stdout=tmp_file)
        tmp_file.close()
    else:
        _ = subprocess.Popen(
            monitor_command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
159
    FNULL.close()
F
fuyw 已提交
160

161 162 163 164 165
    if cpu_num > 0:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.

            # A local worker with {} CPUs is connected to the cluster.    
F
fuyw 已提交
166

167 168 169 170 171 172 173
            # Starting the cluster monitor...""".format(
            port,
            cpu_num,
        )
    else:
        monitor_info = """
            # The Parl cluster is started at localhost:{}.
174

175
            # Starting the cluster monitor...""".format(port)
176 177 178 179
    click.echo(monitor_info)

    # check if monitor is started
    monitor_is_started = False
H
Hongsheng Zeng 已提交
180 181 182 183 184 185
    if _IS_WINDOWS:
        cmd = r'''wmic process where "commandline like '%remote\\monitor.py --monitor_port {} --address localhost:{}%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''.format(
            monitor_port, port)
    else:
        cmd = r'ps -ef | grep -v grep | grep remote/monitor.py\ --monitor_port\ {}\ --address\ localhost:{}'.format(
            monitor_port, port)
186
    for i in range(3):
H
Hongsheng Zeng 已提交
187 188
        check_monitor_is_started = os.popen(cmd).read()
        if len(check_monitor_is_started) > 0:
189 190 191
            monitor_is_started = True
            break
        time.sleep(3)
H
Hongsheng Zeng 已提交
192

193 194 195
    master_ip = get_ip_address()
    if monitor_is_started:
        start_info = """
196
        ## If you want to check cluster status, please view:
197

F
fuyw 已提交
198
            http://{}:{}
199 200 201

        or call:

202 203 204 205 206 207 208
            xparl status""".format(master_ip, monitor_port)
    else:
        start_info = "# Fail to start the cluster monitor."

    monitor_info = """
        {}

209
        ## If you want to add more CPU resources, please call:
210

211
            xparl connect --address {}:{}
212

213
        ## If you want to shutdown the cluster, please call:
F
fuyw 已提交
214

215 216 217
            xparl stop        
        """.format(start_info, master_ip, port)
    click.echo(monitor_info)
F
fuyw 已提交
218

F
fuyw 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233

@click.command("connect", short_help="Start a worker node.")
@click.option(
    "--address", help="IP address of the master node.", required=True)
@click.option(
    "--cpu_num",
    type=int,
    help="Set number of cpu manually. If not set, it will use all "
    "cpus of this machine.")
def start_worker(address, cpu_num):
    if not is_master_started(address):
        raise Exception("Worker can not connect to the master node, " +
                        "please check if the input address {} ".format(
                            address) + "is correct.")
    cpu_num = str(cpu_num) if cpu_num else ''
H
Hongsheng Zeng 已提交
234 235 236
    start_file = __file__.replace('scripts.pyc', 'start.py')
    start_file = start_file.replace('scripts.py', 'start.py')

F
fuyw 已提交
237
    command = [
H
Hongsheng Zeng 已提交
238 239
        sys.executable, start_file, "--name", "worker", "--address", address,
        "--cpu_num",
F
fuyw 已提交
240 241 242 243 244 245 246
        str(cpu_num)
    ]
    p = subprocess.Popen(command)


@click.command("stop", help="Exit the cluster.")
def stop():
247 248 249
    kill_process('remote/start.py')
    kill_process('remote/job.py')
    kill_process('remote/monitor.py')
F
fuyw 已提交
250 251


252 253
@click.command("status")
def status():
H
Hongsheng Zeng 已提交
254 255 256 257 258
    if _IS_WINDOWS:
        cmd = r'''wmic process where "commandline like '%remote\\start.py --name worker --address%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''
    else:
        cmd = r'ps -ef | grep remote/start.py\ --name\ worker\ --address'

259 260 261 262
    content = os.popen(cmd).read().strip()
    pattern = re.compile('--address (.*?) --cpu')
    clusters = set(pattern.findall(content))
    if len(clusters) == 0:
263 264 265 266
        click.echo('No active cluster is found.')
    else:
        ctx = zmq.Context()
        status = []
267
        for cluster in clusters:
H
Hongsheng Zeng 已提交
268 269 270 271 272
            if _IS_WINDOWS:
                cmd = r'''wmic process where "commandline like '%address {}%'" get commandline /format:list | findstr /V wmic | findstr CommandLine='''.format(
                    cluster)
            else:
                cmd = r'ps -ef | grep address\ {}'.format(cluster)
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
            content = os.popen(cmd).read()
            pattern = re.compile('--monitor_port (.*?)\n', re.S)
            monitors = pattern.findall(content)

            if len(monitors):
                monitor_port, _, master_address = monitors[0].split(' ')
                monitor_address = "{}:{}".format(get_ip_address(),
                                                 monitor_port)
                socket = ctx.socket(zmq.REQ)
                socket.setsockopt(zmq.RCVTIMEO, 10000)
                socket.connect('tcp://{}'.format(master_address))
                try:
                    socket.send_multipart([STATUS_TAG])
                    monitor_info = to_str(socket.recv_multipart()[1])
                except zmq.error.Again as e:
                    click.echo(
                        'Can not connect to cluster {}, please try later.'.
                        format(master_address))
                    socket.close(0)
                    continue
                msg = """
294 295 296
            # Cluster {} {}

            # If you want to check cluster status, please view: http://{}
297 298 299 300 301 302 303 304 305
            """.format(master_address, monitor_info, monitor_address)
                status.append(msg)
                socket.close(0)
            else:
                msg = """
            # Cluster {} fails to start the cluster monitor.
                """.format(cluster)
                status.append(msg)

306 307 308 309
        for monitor_status in status:
            click.echo(monitor_status)


F
fuyw 已提交
310 311 312
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
313
cli.add_command(status)
F
fuyw 已提交
314 315 316 317 318 319 320 321


def main():
    return cli()


if __name__ == "__main__":
    main()