diff --git a/parl/remote/client.py b/parl/remote/client.py index fd17adb75babfa6ed86e82d460fd3025ad538cc5..379459c5768914a012cf89182724f1233cbf1329 100644 --- a/parl/remote/client.py +++ b/parl/remote/client.py @@ -59,6 +59,7 @@ class Client(object): self.heartbeat_socket_initialized = threading.Event() self.master_is_alive = True self.client_is_alive = True + self.log_monitor_url = None self.executable_path = self.get_executable_path() @@ -142,14 +143,19 @@ class Client(object): thread.start() self.heartbeat_socket_initialized.wait() + self.client_id = self.reply_master_heartbeat_address.replace(':', '_') + \ + '_' + str(int(time.time())) + # check if the master is connected properly try: self.submit_job_socket.send_multipart([ remote_constants.CLIENT_CONNECT_TAG, - to_byte(self.heartbeat_master_address), - to_byte(socket.gethostname()) + to_byte(self.reply_master_heartbeat_address), + to_byte(socket.gethostname()), + to_byte(self.client_id), ]) - _ = self.submit_job_socket.recv_multipart() + message = self.submit_job_socket.recv_multipart() + self.log_monitor_url = to_str(message[1]) except zmq.error.Again as e: logger.warning("[Client] Can not connect to the master, please " "check if master is started and ensure the input " @@ -160,16 +166,16 @@ class Client(object): "address {} is correct.".format(master_address)) def _reply_heartbeat(self): - """Reply heartbeat signals to the specific node.""" + """Reply heartbeat signals to the master node.""" socket = self.ctx.socket(zmq.REP) socket.linger = 0 socket.setsockopt(zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000) - heartbeat_master_port =\ + reply_master_heartbeat_port =\ socket.bind_to_random_port(addr="tcp://*") - self.heartbeat_master_address = "{}:{}".format(get_ip_address(), - heartbeat_master_port) + self.reply_master_heartbeat_address = "{}:{}".format( + get_ip_address(), reply_master_heartbeat_port) self.heartbeat_socket_initialized.set() connected = False while self.client_is_alive and self.master_is_alive: @@ -181,9 +187,9 @@ class Client(object): remote_constants.HEARTBEAT_TAG, to_byte(self.executable_path), to_byte(str(self.actor_num)), - to_byte(str(elapsed_time)) - ]) - connected = True + to_byte(str(elapsed_time)), + to_byte(str(self.log_monitor_url)), + ]) # TODO: remove additional information except zmq.error.Again as e: if connected: logger.warning("[Client] Cannot connect to the master." @@ -200,7 +206,7 @@ class Client(object): def _check_and_monitor_job(self, job_heartbeat_address, ping_heartbeat_address, max_memory): """ Sometimes the client may receive a job that is dead, thus - we have to check if this job is still alive before sending it to the actor. + we have to check if this job is still alive before adding it to the `actor_num`. """ # job_heartbeat_socket: sends heartbeat signal to job job_heartbeat_socket = self.ctx.socket(zmq.REQ) @@ -289,7 +295,8 @@ class Client(object): self.lock.acquire() self.submit_job_socket.send_multipart([ remote_constants.CLIENT_SUBMIT_TAG, - to_byte(self.heartbeat_master_address) + to_byte(self.reply_master_heartbeat_address), + to_byte(self.client_id), ]) message = self.submit_job_socket.recv_multipart() self.lock.release() @@ -356,6 +363,8 @@ def connect(master_address, distributed_files=[]): if GLOBAL_CLIENT.process_id != cur_process_id: GLOBAL_CLIENT = Client(master_address, cur_process_id, distributed_files) + logger.info("Remote actors log url: {}".format( + GLOBAL_CLIENT.log_monitor_url)) def get_global_client(): diff --git a/parl/remote/cluster_monitor.py b/parl/remote/cluster_monitor.py index b42d3ca0d80cb2c0f2c02e892c4d61c1f38d9e4c..889f91586161f94cddb2a16670360cc8b9d4aca0 100644 --- a/parl/remote/cluster_monitor.py +++ b/parl/remote/cluster_monitor.py @@ -28,7 +28,8 @@ class ClusterMonitor(object): def __init__(self): self.status = { 'workers': defaultdict(dict), - 'clients': defaultdict(dict) + 'clients': defaultdict(dict), + 'client_jobs': defaultdict(dict), } self.lock = threading.Lock() @@ -46,6 +47,11 @@ class ClusterMonitor(object): worker_status['hostname'] = hostname self.lock.release() + def add_client_job(self, client_id, job_info): + self.lock.acquire() + self.status['client_jobs'][client_id].update(job_info) + self.lock.release() + def update_client_status(self, client_status, client_address, client_hostname): """Update client status with message send from client heartbeat. @@ -61,7 +67,8 @@ class ClusterMonitor(object): 'client_address': client_hostname, 'file_path': to_str(client_status[1]), 'actor_num': int(to_str(client_status[2])), - 'time': to_str(client_status[3]) + 'time': to_str(client_status[3]), + 'log_monitor_url': to_str(client_status[4]), } self.lock.release() @@ -96,11 +103,11 @@ class ClusterMonitor(object): self.status['workers'].pop(worker_address) self.lock.release() - def drop_cluster_status(self, client_address): - """Drop cluster status when it exits. + def drop_client_status(self, client_address): + """Drop client status when it exits. Args: - cluster_address (str): IP address of the exited client. + client_address (str): IP address of the exited client. """ self.lock.acquire() if client_address in self.status['clients']: diff --git a/parl/remote/job.py b/parl/remote/job.py index 17a280fb4e2cad7e87a01f18c26bc680198b09fb..483c631b1bb1a2e5bab07be53e53eab27957c3f7 100644 --- a/parl/remote/job.py +++ b/parl/remote/job.py @@ -48,7 +48,7 @@ class Job(object): """ - def __init__(self, worker_address): + def __init__(self, worker_address, log_server_address): """ Args: worker_address(str): worker_address for sending job information(e.g, pid) @@ -60,13 +60,15 @@ class Job(object): self.max_memory = None self.job_address_receiver, job_address_sender = Pipe() + self.job_id_receiver, job_id_sender = Pipe() self.worker_address = worker_address + self.log_server_address = log_server_address self.job_ip = get_ip_address() self.pid = os.getpid() self.run_job_process = Process( - target=self.run, args=(job_address_sender, )) + target=self.run, args=(job_address_sender, job_id_sender)) self.run_job_process.start() """ NOTE: @@ -102,6 +104,7 @@ class Job(object): """ # wait for another process to create reply socket self.job_address = self.job_address_receiver.recv() + self.job_id = self.job_id_receiver.recv() self.ctx = zmq.Context() # create the job_socket @@ -135,7 +138,8 @@ class Job(object): # sends job information to the worker initialized_job = InitializedJob( self.job_address, worker_heartbeat_address, - client_heartbeat_address, ping_heartbeat_address, None, self.pid) + client_heartbeat_address, ping_heartbeat_address, None, self.pid, + self.job_id, self.log_server_address) self.job_socket.send_multipart( [remote_constants.NORMAL_TAG, cloudpickle.dumps(initialized_job)]) @@ -332,7 +336,7 @@ class Job(object): return obj - def run(self, job_address_sender): + def run(self, job_address_sender, job_id_sender): """An infinite loop waiting for a new task. Args: @@ -347,7 +351,15 @@ class Job(object): job_ip = get_ip_address() job_address = "{}:{}".format(job_ip, job_port) + job_id = job_address.replace(':', '_') + '_' + str(int(time.time())) + self.log_dir = os.path.expanduser('~/.parl_data/job/{}'.format(job_id)) + logger.set_dir(self.log_dir) + logger.info( + "[Job] Job {} initialized. Reply heartbeat socket Address: {}.". + format(job_id, job_address)) + job_address_sender.send(job_address) + job_id_sender.send(job_id) try: # receive source code from the actor and append them to the environment variables. @@ -391,7 +403,15 @@ class Job(object): function_name = to_str(message[1]) data = message[2] args, kwargs = loads_argument(data) - ret = getattr(obj, function_name)(*args, **kwargs) + + # Redirect stdout to stdout.log temporarily + logfile_path = os.path.join(self.log_dir, 'stdout.log') + with open(logfile_path, 'a') as f: + tmp = sys.stdout + sys.stdout = f + ret = getattr(obj, function_name)(*args, **kwargs) + sys.stdout = tmp + ret = dumps_return(ret) reply_socket.send_multipart( @@ -450,5 +470,10 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--worker_address", required=True, type=str, help="worker_address") + parser.add_argument( + "--log_server_address", + required=True, + type=str, + help="log_server_address, address of the log web server on worker") args = parser.parse_args() - job = Job(args.worker_address) + job = Job(args.worker_address, args.log_server_address) diff --git a/parl/remote/log_server.py b/parl/remote/log_server.py new file mode 100644 index 0000000000000000000000000000000000000000..f3ad1cf882311b9bdffcd990e7b33ddff0711bc2 --- /dev/null +++ b/parl/remote/log_server.py @@ -0,0 +1,102 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import linecache +import os + +from flask import Flask, current_app, jsonify, make_response, request, send_file +from flask_cors import CORS + +app = Flask(__name__) +CORS(app) + + +@app.route( + "/get-log", methods=[ + 'GET', + ]) +def get_log(): + ''' + args: + job_id: id of the remote job + response: + log: newest `LINE_NUM` lines of the log file + ''' + try: + job_id = request.args['job_id'] + except: + return make_response( + jsonify(message="No job_id provided, please check your request."), + 400) + + log_dir = current_app.config.get('LOG_DIR') + log_dir = os.path.expanduser(log_dir) + log_file_path = os.path.join(log_dir, job_id, 'stdout.log') + if not os.path.isfile(log_file_path): + return make_response( + jsonify(message="Log not exsits, please check your job_id"), 400) + else: + line_num = current_app.config.get('LINE_NUM') + linecache.checkcache(log_file_path) + log_content = ''.join(linecache.getlines(log_file_path)[-line_num:]) + return make_response( + jsonify(message="Log exsits, content in log", log=log_content), + 200) + + +@app.route( + '/download-log', methods=[ + 'GET', + ]) +def download_log(): + ''' + args: + job_id: the id of the remote job + response: + log: log file + ''' + try: + job_id = request.args['job_id'] + except: + return make_response( + jsonify(message="No job_id provided, please check your request."), + 400) + log_dir = current_app.config.get('LOG_DIR') + log_dir = os.path.expanduser(log_dir) + log_file_path = os.path.join(log_dir, job_id, 'stdout.log') + if not os.path.isfile(log_file_path): + return make_response( + jsonify(message="Log not exsits, please check your job_id"), 400) + else: + return send_file(log_file_path, as_attachment=True) + + +if __name__ == "__main__": + import logging + log = logging.getLogger('werkzeug') + log.disabled = True + + parser = argparse.ArgumentParser() + parser.add_argument('--port', required=True, type=int) + parser.add_argument('--log_dir', required=True, type=str) + parser.add_argument('--line_num', required=True, type=int) + args = parser.parse_args() + + app.config.from_mapping( + LOG_DIR=args.log_dir, + LINE_NUM=args.line_num, + ) + + app.run(host="0.0.0.0", port=args.port) diff --git a/parl/remote/master.py b/parl/remote/master.py index 4c96252309f977b5ffebf17c8dee2a219b6ca168..8cca0290a7ad68407026f2e24c4613da83af56a3 100644 --- a/parl/remote/master.py +++ b/parl/remote/master.py @@ -57,9 +57,10 @@ class Master(object): port: The ip port that the master node binds to. """ - def __init__(self, port): + def __init__(self, port, monitor_port=None): self.ctx = zmq.Context() self.master_ip = get_ip_address() + self.monitor_url = "http://{}:{}".format(self.master_ip, monitor_port) logger.set_dir( os.path.expanduser('~/.parl_data/master/{}_{}'.format( self.master_ip, port))) @@ -135,7 +136,7 @@ class Master(object): except zmq.error.Again as e: client_is_alive = False - self.cluster_monitor.drop_cluster_status( + self.cluster_monitor.drop_client_status( client_heartbeat_address) logger.warning("[Master] cannot connect to the client " + "{}. ".format(client_heartbeat_address) + @@ -205,8 +206,11 @@ class Master(object): # a client connects to the master elif tag == remote_constants.CLIENT_CONNECT_TAG: + # `client_heartbeat_address` is the + # `reply_master_heartbeat_address` of the client client_heartbeat_address = to_str(message[1]) client_hostname = to_str(message[2]) + client_id = to_str(message[3]) self.client_hostname[client_heartbeat_address] = client_hostname logger.info( "Client {} is connected.".format(client_heartbeat_address)) @@ -215,11 +219,14 @@ class Master(object): target=self._create_client_monitor, args=(client_heartbeat_address, )) thread.start() - self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) + log_monitor_address = "{}/logs?client_id={}".format( + self.monitor_url, client_id) + self.client_socket.send_multipart( + [remote_constants.NORMAL_TAG, + to_byte(log_monitor_address)]) # a client submits a job to the master elif tag == remote_constants.CLIENT_SUBMIT_TAG: - # check available CPU resources if self.cpu_num: logger.info("Submitting job...") @@ -230,6 +237,9 @@ class Master(object): to_byte(job.client_heartbeat_address), to_byte(job.ping_heartbeat_address), ]) + client_id = to_str(message[2]) + job_info = {job.job_id: job.log_server_address} + self.cluster_monitor.add_client_job(client_id, job_info) self._print_workers() else: self.client_socket.send_multipart([remote_constants.CPU_TAG]) diff --git a/parl/remote/message.py b/parl/remote/message.py index 8be8d4657110011c34cca8702290a9942d225e36..97e5482f9e5a25fe52b6919494f4dde1b21e7d5b 100644 --- a/parl/remote/message.py +++ b/parl/remote/message.py @@ -14,9 +14,15 @@ class InitializedJob(object): - def __init__(self, job_address, worker_heartbeat_address, - client_heartbeat_address, ping_heartbeat_address, - worker_address, pid): + def __init__(self, + job_address, + worker_heartbeat_address, + client_heartbeat_address, + ping_heartbeat_address, + worker_address, + pid, + job_id=None, + log_server_address=None): """ Args: job_address(str): Job address to which the new task connect. @@ -35,6 +41,8 @@ class InitializedJob(object): self.worker_address = worker_address self.pid = pid self.is_alive = True + self.job_id = job_id + self.log_server_address = log_server_address class InitializedWorker(object): diff --git a/parl/remote/monitor.py b/parl/remote/monitor.py index a959f159dda3fbc3745ff203c8fb547e02dc4fed..452888940c4eb8de94632f5adc55a097255e94c0 100644 --- a/parl/remote/monitor.py +++ b/parl/remote/monitor.py @@ -19,7 +19,7 @@ import time import zmq import threading -from flask import Flask, render_template, jsonify +from flask import Flask, render_template, jsonify, request app = Flask(__name__) @@ -81,6 +81,7 @@ class ClusterMonitor(object): data['total_vacant_cpus'] = total_vacant_cpus data['total_cpus'] = total_used_cpus + total_vacant_cpus data['clients'] = list(status['clients'].values()) + data['client_jobs'] = status['client_jobs'] self.data = data time.sleep(10) @@ -99,6 +100,39 @@ def cluster(): return jsonify(data) +@app.route( + '/logs', methods=[ + 'GET', + ]) +def logs(): + client_id = request.args.get('client_id') + return render_template('jobs.html', client_id=client_id) + + +@app.route( + '/get-jobs', methods=[ + 'GET', + ]) +def get_jobs(): + client_id = request.args.get('client_id') + jobs = CLUSTER_MONITOR.get_data()['client_jobs'].get(client_id) + data = [] + if jobs: + for idx, job_id in enumerate(jobs): + monitor_url = jobs[job_id] + data.append({ + "id": + idx, + "job_id": + job_id, + "log_url": + "http://{}/get-log?job_id={}".format(monitor_url, job_id), + "download_url": + "http://{}/download-log?job_id={}".format(monitor_url, job_id), + }) + return jsonify(data) + + if __name__ == "__main__": import logging log = logging.getLogger('werkzeug') diff --git a/parl/remote/scripts.py b/parl/remote/scripts.py index 08a06c2eab3678673117bee01cd3cbb2841138c4..51cf3cabf7cb3a3dba0cc3d0d00a7ad55406b4f8 100644 --- a/parl/remote/scripts.py +++ b/parl/remote/scripts.py @@ -18,7 +18,7 @@ import multiprocessing import os import random import re -import socket +import requests import subprocess import sys import time @@ -27,7 +27,9 @@ import tempfile import warnings import zmq from multiprocessing import Process -from parl.utils import get_ip_address, to_str, _IS_WINDOWS, kill_process +from parl.utils import (_IS_WINDOWS, get_free_tcp_port, get_ip_address, + get_port_from_range, is_port_available, kill_process, + to_str) from parl.remote.remote_constants import STATUS_TAG # A flag to mark if parl is started from a command line @@ -47,26 +49,6 @@ if sys.version_info.major == 3: warnings.simplefilter("ignore", ResourceWarning) -def get_free_tcp_port(): - tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tcp.bind(('', 0)) - addr, port = tcp.getsockname() - tcp.close() - return str(port) - - -def is_port_available(port): - """ Check if a port is used. - - True if the port is available for connection. - """ - port = int(port) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - available = sock.connect_ex(('localhost', port)) - sock.close() - return available - - def is_master_started(address): ctx = zmq.Context() socket = ctx.socket(zmq.REQ) @@ -83,6 +65,33 @@ def is_master_started(address): return False +def parse_port_range(log_server_port_range): + try: + re.match(r'\d*[-]\d*', log_server_port_range).span() + except: + raise Exception( + "The input log_server_port_range should be `start-end` format.") + start, end = map(int, log_server_port_range.split('-')) + if start > end: + raise Exception( + "Start port number must be smaller than the end port number.") + + return start, end + + +def is_log_server_started(ip_address, port): + started = False + for _ in range(3): + try: + r = requests.get("http://{}:{}/get-log".format(ip_address, port)) + if r.status_code == 400: + started = True + break + except: + time.sleep(3) + return started + + @click.group() def cli(): pass @@ -101,7 +110,15 @@ def cli(): "cpus of this machine.") @click.option( "--monitor_port", help="The port to start a cluster monitor.", type=str) -def start_master(port, cpu_num, monitor_port, debug): +@click.option( + "--log_server_port_range", + help=''' + Port range (start-end) of the log server on the worker. Default: 8000-9000. + The worker will pick a random avaliable port in [start, end] for the log server. + ''', + default="8000-9000", + type=str) +def start_master(port, cpu_num, monitor_port, debug, log_server_port_range): if debug: os.environ['DEBUG'] = 'True' @@ -122,14 +139,26 @@ def start_master(port, cpu_num, monitor_port, debug): monitor_file = monitor_file.replace('scripts.py', 'monitor.py') monitor_port = monitor_port if monitor_port else get_free_tcp_port() + start, end = parse_port_range(log_server_port_range) + log_server_port = get_port_from_range(start, end) + while log_server_port == monitor_port or log_server_port == port: + log_server_port = get_port_from_range(start, end) master_command = [ - sys.executable, start_file, "--name", "master", "--port", port + sys.executable, + start_file, + "--name", + "master", + "--port", + port, + "--monitor_port", + monitor_port, ] worker_command = [ sys.executable, start_file, "--name", "worker", "--address", "localhost:" + str(port), "--cpu_num", - str(cpu_num) + str(cpu_num), '--log_server_port', + str(log_server_port) ] monitor_command = [ sys.executable, monitor_file, "--monitor_port", @@ -216,6 +245,9 @@ def start_master(port, cpu_num, monitor_port, debug): """.format(start_info, master_ip, port) click.echo(monitor_info) + if not is_log_server_started(master_ip, log_server_port): + click.echo("# Fail to start the log server.") + @click.command("connect", short_help="Start a worker node.") @click.option( @@ -225,7 +257,18 @@ def start_master(port, cpu_num, monitor_port, debug): type=int, help="Set number of cpu manually. If not set, it will use all " "cpus of this machine.") -def start_worker(address, cpu_num): +@click.option( + "--log_server_port_range", + help=''' + Port range (start-end) of the log server on the worker. Default: 8000-9000. + The worker will pick a random avaliable port in [start, end] for the log server. + ''', + default="8000-9000", + type=str) +def start_worker(address, cpu_num, log_server_port_range): + start, end = parse_port_range(log_server_port_range) + log_server_port = get_port_from_range(start, end) + if not is_master_started(address): raise Exception("Worker can not connect to the master node, " + "please check if the input address {} ".format( @@ -237,16 +280,21 @@ def start_worker(address, cpu_num): command = [ sys.executable, start_file, "--name", "worker", "--address", address, "--cpu_num", - str(cpu_num) + str(cpu_num), "--log_server_port", + str(log_server_port) ] p = subprocess.Popen(command) + if not is_log_server_started(get_ip_address(), log_server_port): + click.echo("# Fail to start the log server.") + @click.command("stop", help="Exit the cluster.") def stop(): kill_process('remote/start.py') kill_process('remote/job.py') kill_process('remote/monitor.py') + kill_process('remote/log_server.py') @click.command("status") diff --git a/parl/remote/start.py b/parl/remote/start.py index d9aa231db65a04ee410d7df6660d9eaa75150828..83c8dca86a726b8c77307b0e080829ae67bc186a 100644 --- a/parl/remote/start.py +++ b/parl/remote/start.py @@ -28,13 +28,15 @@ def main(args): if args.name == 'master': port = args.port - master = Master(port) + monitor_port = args.monitor_port + master = Master(port, monitor_port) master.run() elif args.name == 'worker': address = args.address + log_server_port = args.log_server_port cpu_num = int(args.cpu_num) if args.cpu_num else None - worker = Worker(address, cpu_num) + worker = Worker(address, cpu_num, log_server_port) worker.run() else: @@ -48,5 +50,7 @@ if __name__ == "__main__": parser.add_argument('--port', default='1234', type=str) parser.add_argument('--address', default='localhost:1234', type=str) parser.add_argument('--cpu_num', default='', type=str) + parser.add_argument('--monitor_port', default='', type=str) + parser.add_argument('--log_server_port', default='', type=str) args = parser.parse_args() main(args) diff --git a/parl/remote/static/js/ansi_up.js b/parl/remote/static/js/ansi_up.js new file mode 100644 index 0000000000000000000000000000000000000000..b207399e24887a4d2b13e03482f98f16b2137cf0 --- /dev/null +++ b/parl/remote/static/js/ansi_up.js @@ -0,0 +1,421 @@ +/* ansi_up.js + * author : Dru Nelson + * license : MIT + * http://github.com/drudru/ansi_up + */ +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + // AMD. Register as an anonymous module. + define(['exports'], factory); + } else if (typeof exports === 'object' && typeof exports.nodeName !== 'string') { + // CommonJS + factory(exports); + } else { + // Browser globals + var exp = {}; + factory(exp); + root.AnsiUp = exp.default; + } +}(this, function (exports) { +"use strict"; +var __makeTemplateObject = (this && this.__makeTemplateObject) || function (cooked, raw) { + if (Object.defineProperty) { Object.defineProperty(cooked, "raw", { value: raw }); } else { cooked.raw = raw; } + return cooked; +}; +var PacketKind; +(function (PacketKind) { + PacketKind[PacketKind["EOS"] = 0] = "EOS"; + PacketKind[PacketKind["Text"] = 1] = "Text"; + PacketKind[PacketKind["Incomplete"] = 2] = "Incomplete"; + PacketKind[PacketKind["ESC"] = 3] = "ESC"; + PacketKind[PacketKind["Unknown"] = 4] = "Unknown"; + PacketKind[PacketKind["SGR"] = 5] = "SGR"; + PacketKind[PacketKind["OSCURL"] = 6] = "OSCURL"; +})(PacketKind || (PacketKind = {})); +var AnsiUp = (function () { + function AnsiUp() { + this.VERSION = "4.0.3"; + this.setup_palettes(); + this._use_classes = false; + this._escape_for_html = true; + this.bold = false; + this.fg = this.bg = null; + this._buffer = ''; + this._url_whitelist = { 'http': 1, 'https': 1 }; + } + Object.defineProperty(AnsiUp.prototype, "use_classes", { + get: function () { + return this._use_classes; + }, + set: function (arg) { + this._use_classes = arg; + }, + enumerable: true, + configurable: true + }); + Object.defineProperty(AnsiUp.prototype, "escape_for_html", { + get: function () { + return this._escape_for_html; + }, + set: function (arg) { + this._escape_for_html = arg; + }, + enumerable: true, + configurable: true + }); + Object.defineProperty(AnsiUp.prototype, "url_whitelist", { + get: function () { + return this._url_whitelist; + }, + set: function (arg) { + this._url_whitelist = arg; + }, + enumerable: true, + configurable: true + }); + AnsiUp.prototype.setup_palettes = function () { + var _this = this; + this.ansi_colors = + [ + [ + { rgb: [0, 0, 0], class_name: "ansi-black" }, + { rgb: [187, 0, 0], class_name: "ansi-red" }, + { rgb: [0, 187, 0], class_name: "ansi-green" }, + { rgb: [187, 187, 0], class_name: "ansi-yellow" }, + { rgb: [0, 0, 187], class_name: "ansi-blue" }, + { rgb: [187, 0, 187], class_name: "ansi-magenta" }, + { rgb: [0, 187, 187], class_name: "ansi-cyan" }, + { rgb: [255, 255, 255], class_name: "ansi-white" } + ], + [ + { rgb: [85, 85, 85], class_name: "ansi-bright-black" }, + { rgb: [255, 85, 85], class_name: "ansi-bright-red" }, + { rgb: [0, 255, 0], class_name: "ansi-bright-green" }, + { rgb: [255, 255, 85], class_name: "ansi-bright-yellow" }, + { rgb: [85, 85, 255], class_name: "ansi-bright-blue" }, + { rgb: [255, 85, 255], class_name: "ansi-bright-magenta" }, + { rgb: [85, 255, 255], class_name: "ansi-bright-cyan" }, + { rgb: [255, 255, 255], class_name: "ansi-bright-white" } + ] + ]; + this.palette_256 = []; + this.ansi_colors.forEach(function (palette) { + palette.forEach(function (rec) { + _this.palette_256.push(rec); + }); + }); + var levels = [0, 95, 135, 175, 215, 255]; + for (var r = 0; r < 6; ++r) { + for (var g = 0; g < 6; ++g) { + for (var b = 0; b < 6; ++b) { + var col = { rgb: [levels[r], levels[g], levels[b]], class_name: 'truecolor' }; + this.palette_256.push(col); + } + } + } + var grey_level = 8; + for (var i = 0; i < 24; ++i, grey_level += 10) { + var gry = { rgb: [grey_level, grey_level, grey_level], class_name: 'truecolor' }; + this.palette_256.push(gry); + } + }; + AnsiUp.prototype.escape_txt_for_html = function (txt) { + return txt.replace(/[&<>]/gm, function (str) { + if (str === "&") + return "&"; + if (str === "<") + return "<"; + if (str === ">") + return ">"; + }); + }; + AnsiUp.prototype.append_buffer = function (txt) { + var str = this._buffer + txt; + this._buffer = str; + }; + AnsiUp.prototype.get_next_packet = function () { + var pkt = { + kind: PacketKind.EOS, + text: '', + url: '' + }; + var len = this._buffer.length; + if (len == 0) + return pkt; + var pos = this._buffer.indexOf("\x1B"); + if (pos == -1) { + pkt.kind = PacketKind.Text; + pkt.text = this._buffer; + this._buffer = ''; + return pkt; + } + if (pos > 0) { + pkt.kind = PacketKind.Text; + pkt.text = this._buffer.slice(0, pos); + this._buffer = this._buffer.slice(pos); + return pkt; + } + if (pos == 0) { + if (len == 1) { + pkt.kind = PacketKind.Incomplete; + return pkt; + } + var next_char = this._buffer.charAt(1); + if ((next_char != '[') && (next_char != ']')) { + pkt.kind = PacketKind.ESC; + pkt.text = this._buffer.slice(0, 1); + this._buffer = this._buffer.slice(1); + return pkt; + } + if (next_char == '[') { + if (!this._csi_regex) { + this._csi_regex = rgx(__makeTemplateObject(["\n ^ # beginning of line\n #\n # First attempt\n (?: # legal sequence\n \u001B[ # CSI\n ([<-?]?) # private-mode char\n ([d;]*) # any digits or semicolons\n ([ -/]? # an intermediate modifier\n [@-~]) # the command\n )\n | # alternate (second attempt)\n (?: # illegal sequence\n \u001B[ # CSI\n [ -~]* # anything legal\n ([\0-\u001F:]) # anything illegal\n )\n "], ["\n ^ # beginning of line\n #\n # First attempt\n (?: # legal sequence\n \\x1b\\[ # CSI\n ([\\x3c-\\x3f]?) # private-mode char\n ([\\d;]*) # any digits or semicolons\n ([\\x20-\\x2f]? # an intermediate modifier\n [\\x40-\\x7e]) # the command\n )\n | # alternate (second attempt)\n (?: # illegal sequence\n \\x1b\\[ # CSI\n [\\x20-\\x7e]* # anything legal\n ([\\x00-\\x1f:]) # anything illegal\n )\n "])); + } + var match = this._buffer.match(this._csi_regex); + if (match === null) { + pkt.kind = PacketKind.Incomplete; + return pkt; + } + if (match[4]) { + pkt.kind = PacketKind.ESC; + pkt.text = this._buffer.slice(0, 1); + this._buffer = this._buffer.slice(1); + return pkt; + } + if ((match[1] != '') || (match[3] != 'm')) + pkt.kind = PacketKind.Unknown; + else + pkt.kind = PacketKind.SGR; + pkt.text = match[2]; + var rpos = match[0].length; + this._buffer = this._buffer.slice(rpos); + return pkt; + } + if (next_char == ']') { + if (len < 4) { + pkt.kind = PacketKind.Incomplete; + return pkt; + } + if ((this._buffer.charAt(2) != '8') + || (this._buffer.charAt(3) != ';')) { + pkt.kind = PacketKind.ESC; + pkt.text = this._buffer.slice(0, 1); + this._buffer = this._buffer.slice(1); + return pkt; + } + if (!this._osc_st) { + this._osc_st = rgxG(__makeTemplateObject(["\n (?: # legal sequence\n (\u001B\\) # ESC | # alternate\n (\u0007) # BEL (what xterm did)\n )\n | # alternate (second attempt)\n ( # illegal sequence\n [\0-\u0006] # anything illegal\n | # alternate\n [\b-\u001A] # anything illegal\n | # alternate\n [\u001C-\u001F] # anything illegal\n )\n "], ["\n (?: # legal sequence\n (\\x1b\\\\) # ESC \\\n | # alternate\n (\\x07) # BEL (what xterm did)\n )\n | # alternate (second attempt)\n ( # illegal sequence\n [\\x00-\\x06] # anything illegal\n | # alternate\n [\\x08-\\x1a] # anything illegal\n | # alternate\n [\\x1c-\\x1f] # anything illegal\n )\n "])); + } + this._osc_st.lastIndex = 0; + { + var match_1 = this._osc_st.exec(this._buffer); + if (match_1 === null) { + pkt.kind = PacketKind.Incomplete; + return pkt; + } + if (match_1[3]) { + pkt.kind = PacketKind.ESC; + pkt.text = this._buffer.slice(0, 1); + this._buffer = this._buffer.slice(1); + return pkt; + } + } + { + var match_2 = this._osc_st.exec(this._buffer); + if (match_2 === null) { + pkt.kind = PacketKind.Incomplete; + return pkt; + } + if (match_2[3]) { + pkt.kind = PacketKind.ESC; + pkt.text = this._buffer.slice(0, 1); + this._buffer = this._buffer.slice(1); + return pkt; + } + } + if (!this._osc_regex) { + this._osc_regex = rgx(__makeTemplateObject(["\n ^ # beginning of line\n #\n \u001B]8; # OSC Hyperlink\n [ -:<-~]* # params (excluding ;)\n ; # end of params\n ([!-~]{0,512}) # URL capture\n (?: # ST\n (?:\u001B\\) # ESC | # alternate\n (?:\u0007) # BEL (what xterm did)\n )\n ([!-~]+) # TEXT capture\n \u001B]8;; # OSC Hyperlink End\n (?: # ST\n (?:\u001B\\) # ESC | # alternate\n (?:\u0007) # BEL (what xterm did)\n )\n "], ["\n ^ # beginning of line\n #\n \\x1b\\]8; # OSC Hyperlink\n [\\x20-\\x3a\\x3c-\\x7e]* # params (excluding ;)\n ; # end of params\n ([\\x21-\\x7e]{0,512}) # URL capture\n (?: # ST\n (?:\\x1b\\\\) # ESC \\\n | # alternate\n (?:\\x07) # BEL (what xterm did)\n )\n ([\\x21-\\x7e]+) # TEXT capture\n \\x1b\\]8;; # OSC Hyperlink End\n (?: # ST\n (?:\\x1b\\\\) # ESC \\\n | # alternate\n (?:\\x07) # BEL (what xterm did)\n )\n "])); + } + var match = this._buffer.match(this._osc_regex); + if (match === null) { + pkt.kind = PacketKind.ESC; + pkt.text = this._buffer.slice(0, 1); + this._buffer = this._buffer.slice(1); + return pkt; + } + pkt.kind = PacketKind.OSCURL; + pkt.url = match[1]; + pkt.text = match[2]; + var rpos = match[0].length; + this._buffer = this._buffer.slice(rpos); + return pkt; + } + } + }; + AnsiUp.prototype.ansi_to_html = function (txt) { + this.append_buffer(txt); + var blocks = []; + while (true) { + var packet = this.get_next_packet(); + if ((packet.kind == PacketKind.EOS) + || (packet.kind == PacketKind.Incomplete)) + break; + if ((packet.kind == PacketKind.ESC) + || (packet.kind == PacketKind.Unknown)) + continue; + if (packet.kind == PacketKind.Text) + blocks.push(this.transform_to_html(this.with_state(packet))); + else if (packet.kind == PacketKind.SGR) + this.process_ansi(packet); + else if (packet.kind == PacketKind.OSCURL) + blocks.push(this.process_hyperlink(packet)); + } + return blocks.join(""); + }; + AnsiUp.prototype.with_state = function (pkt) { + return { bold: this.bold, fg: this.fg, bg: this.bg, text: pkt.text }; + }; + AnsiUp.prototype.process_ansi = function (pkt) { + var sgr_cmds = pkt.text.split(';'); + while (sgr_cmds.length > 0) { + var sgr_cmd_str = sgr_cmds.shift(); + var num = parseInt(sgr_cmd_str, 10); + if (isNaN(num) || num === 0) { + this.fg = this.bg = null; + this.bold = false; + } + else if (num === 1) { + this.bold = true; + } + else if (num === 22) { + this.bold = false; + } + else if (num === 39) { + this.fg = null; + } + else if (num === 49) { + this.bg = null; + } + else if ((num >= 30) && (num < 38)) { + this.fg = this.ansi_colors[0][(num - 30)]; + } + else if ((num >= 40) && (num < 48)) { + this.bg = this.ansi_colors[0][(num - 40)]; + } + else if ((num >= 90) && (num < 98)) { + this.fg = this.ansi_colors[1][(num - 90)]; + } + else if ((num >= 100) && (num < 108)) { + this.bg = this.ansi_colors[1][(num - 100)]; + } + else if (num === 38 || num === 48) { + if (sgr_cmds.length > 0) { + var is_foreground = (num === 38); + var mode_cmd = sgr_cmds.shift(); + if (mode_cmd === '5' && sgr_cmds.length > 0) { + var palette_index = parseInt(sgr_cmds.shift(), 10); + if (palette_index >= 0 && palette_index <= 255) { + if (is_foreground) + this.fg = this.palette_256[palette_index]; + else + this.bg = this.palette_256[palette_index]; + } + } + if (mode_cmd === '2' && sgr_cmds.length > 2) { + var r = parseInt(sgr_cmds.shift(), 10); + var g = parseInt(sgr_cmds.shift(), 10); + var b = parseInt(sgr_cmds.shift(), 10); + if ((r >= 0 && r <= 255) && (g >= 0 && g <= 255) && (b >= 0 && b <= 255)) { + var c = { rgb: [r, g, b], class_name: 'truecolor' }; + if (is_foreground) + this.fg = c; + else + this.bg = c; + } + } + } + } + } + }; + AnsiUp.prototype.transform_to_html = function (fragment) { + var txt = fragment.text; + if (txt.length === 0) + return txt; + if (this._escape_for_html) + txt = this.escape_txt_for_html(txt); + if (!fragment.bold && fragment.fg === null && fragment.bg === null) + return txt; + var styles = []; + var classes = []; + var fg = fragment.fg; + var bg = fragment.bg; + if (fragment.bold) + styles.push('font-weight:bold'); + if (!this._use_classes) { + if (fg) + styles.push("color:rgb(" + fg.rgb.join(',') + ")"); + if (bg) + styles.push("background-color:rgb(" + bg.rgb + ")"); + } + else { + if (fg) { + if (fg.class_name !== 'truecolor') { + classes.push(fg.class_name + "-fg"); + } + else { + styles.push("color:rgb(" + fg.rgb.join(',') + ")"); + } + } + if (bg) { + if (bg.class_name !== 'truecolor') { + classes.push(bg.class_name + "-bg"); + } + else { + styles.push("background-color:rgb(" + bg.rgb.join(',') + ")"); + } + } + } + var class_string = ''; + var style_string = ''; + if (classes.length) + class_string = " class=\"" + classes.join(' ') + "\""; + if (styles.length) + style_string = " style=\"" + styles.join(';') + "\""; + return "" + txt + ""; + }; + ; + AnsiUp.prototype.process_hyperlink = function (pkt) { + var parts = pkt.url.split(':'); + if (parts.length < 1) + return ''; + if (!this._url_whitelist[parts[0]]) + return ''; + var result = "" + this.escape_txt_for_html(pkt.text) + ""; + return result; + }; + return AnsiUp; +}()); +function rgx(tmplObj) { + var subst = []; + for (var _i = 1; _i < arguments.length; _i++) { + subst[_i - 1] = arguments[_i]; + } + var regexText = tmplObj.raw[0]; + var wsrgx = /^\s+|\s+\n|\s*#[\s\S]*?\n|\n/gm; + var txt2 = regexText.replace(wsrgx, ''); + return new RegExp(txt2); +} +function rgxG(tmplObj) { + var subst = []; + for (var _i = 1; _i < arguments.length; _i++) { + subst[_i - 1] = arguments[_i]; + } + var regexText = tmplObj.raw[0]; + var wsrgx = /^\s+|\s+\n|\s*#[\s\S]*?\n|\n/gm; + var txt2 = regexText.replace(wsrgx, ''); + return new RegExp(txt2, 'g'); +} +//# sourceMappingURL=ansi_up.js.map + Object.defineProperty(exports, "__esModule", { value: true }); + exports.default = AnsiUp; +})); diff --git a/parl/remote/static/js/jquery.ajax-cross-origin.min.js b/parl/remote/static/js/jquery.ajax-cross-origin.min.js new file mode 100644 index 0000000000000000000000000000000000000000..cd57dcff2843d0d298fa36851aefedb7984ebe1e --- /dev/null +++ b/parl/remote/static/js/jquery.ajax-cross-origin.min.js @@ -0,0 +1,57 @@ +/* + jQuery AJAX Cross Origin v1.3 (http://www.ajax-cross-origin.com) + jQuery plugin to bypass Same-origin_policy using Google Apps Script. + + references: + http://en.wikipedia.org/wiki/Same-origin_policy + http://www.google.com/script/start/ + + (c) 2014, Writen by Erez Ninio. site: www.dealhotelbook.com + + Licensed under the Creative Commons Attribution 3.0 Unported License. + For details, see http://creativecommons.org/licenses/by/3.0/. +*/ + +var proxyJsonp = + "https://script.google.com/macros/s/AKfycbwmqG55tt2d2FcT_WQ3WjCSKmtyFpkOcdprSITn45-4UgVJnzp9/exec"; +jQuery.ajaxOrig = jQuery.ajax; +jQuery.ajax = function (a, b) { + function d(a) { + a = encodeURI(a).replace(/&/g, "%26"); + return proxyJsonp + "?url=" + a + "&callback=?"; + } + var c = "object" === typeof a ? a : b || {}; + c.url = c.url || ("string" === typeof a ? a : ""); + var c = jQuery.ajaxSetup({}, c), + e = (function (a, c) { + var b = document.createElement("a"); + b.href = a; + return ( + c.crossOrigin && + "http" == a.substr(0, 4).toLowerCase() && + "localhost" != b.hostname && + "127.0.0.1" != b.hostname && + b.hostname != window.location.hostname + ); + })(c.url, c); + c.proxy && + 0 < c.proxy.length && + ((proxyJsonp = c.proxy), + "object" === typeof a + ? (a.crossDomain = !0) + : "object" === typeof b && (b.crossDomain = !0)); + e && + ("object" === typeof a + ? a.url && + ((a.url = d(a.url)), + a.charset && (a.url += "&charset=" + a.charset), + (a.dataType = "json")) + : "string" === typeof a && + "object" === typeof b && + ((a = d(a)), + b.charset && (a += "&charset=" + b.charset), + (b.dataType = "json"))); + return jQuery.ajaxOrig.apply(this, arguments); +}; +jQuery.ajax.prototype = new jQuery.ajaxOrig(); +jQuery.ajax.prototype.constructor = jQuery.ajax; diff --git a/parl/remote/static/js/parl.js b/parl/remote/static/js/parl.js index 117e2d5542e69213a0b4ae7e04d5b6c6533006c8..e158e69917f969c62b6be2daf4b43176f0674ba7 100644 --- a/parl/remote/static/js/parl.js +++ b/parl/remote/static/js/parl.js @@ -185,7 +185,8 @@ function autoTable(res) { var s3 = `${res.clients[i].client_address}`; var s4 = `${res.clients[i].actor_num}`; var s5 = `${res.clients[i].time}`; - tr.innerHTML = s1 + s2 + s3 + s4 + s5; + var s6 = `link`; + tr.innerHTML = s1 + s2 + s3 + s4 + s5 + s6; table.appendChild(tr); } }; diff --git a/parl/remote/templates/clients.html b/parl/remote/templates/clients.html index b87962f11d1a41d649ec953d426d418be0b2baf1..e0089b6422bb4a5af43372d3962adbb9303218af 100644 --- a/parl/remote/templates/clients.html +++ b/parl/remote/templates/clients.html @@ -43,10 +43,11 @@ Hostname Actor Num Time (min) + Log - Loading Data... + Loading Data... diff --git a/parl/remote/templates/jobs.html b/parl/remote/templates/jobs.html new file mode 100644 index 0000000000000000000000000000000000000000..56e8a775a5dd6eb86e17bfc61fedf660e97d7ff0 --- /dev/null +++ b/parl/remote/templates/jobs.html @@ -0,0 +1,192 @@ + + + + + Parl Cluster + + + + + + + + + + +
+
+ Jobs Monitor +
+ +
+
+

+ Remote Job Log +

+

+ Client ID: {{ client_id }} +

+
+
+
+
+ + + + + + + diff --git a/parl/remote/tests/log_server_test.py b/parl/remote/tests/log_server_test.py new file mode 100644 index 0000000000000000000000000000000000000000..b8a8c37a6164cbe5c045bd34b205e06d954166e1 --- /dev/null +++ b/parl/remote/tests/log_server_test.py @@ -0,0 +1,180 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import multiprocessing +import os +import pickle +import subprocess +import sys +import tempfile +import threading +import time +import unittest + +import requests + +import parl +from parl.remote.client import disconnect, get_global_client +from parl.remote.master import Master +from parl.remote.worker import Worker +from parl.utils import _IS_WINDOWS, get_free_tcp_port + + +@parl.remote_class +class Actor(object): + def __init__(self, number=None, arg1=None, arg2=None): + self.number = number + self.arg1 = arg1 + self.arg2 = arg2 + + def sim_output(self, start, end): + output = "" + print(self.number) + output += str(self.number) + output += "\n" + for i in range(start, end): + print(i) + output += str(i) + output += "\n" + return output + + +class TestLogServer(unittest.TestCase): + def tearDown(self): + disconnect() + + #In windows, multiprocessing.Process cannot run the method of class, but static method is ok. + @staticmethod + def _connect_and_create_actor(cluster_addr): + parl.connect(cluster_addr) + outputs = [] + for i in range(2): + actor = Actor(number=i) + ret = actor.sim_output(1, 4) + assert ret != "" + outputs.append(ret) + return outputs + + def test_log_server(self): + master_port = 8401 + # start the master + master = Master(port=master_port) + th = threading.Thread(target=master.run) + th.start() + time.sleep(1) + + cluster_addr = 'localhost:{}'.format(master_port) + log_server_port = 8402 + worker = Worker(cluster_addr, 4, log_server_port=log_server_port) + outputs = self._connect_and_create_actor(cluster_addr) + + # Get status + status = master._get_status() + client_jobs = pickle.loads(status).get('client_jobs') + self.assertIsNotNone(client_jobs) + + # Get job id + client = get_global_client() + jobs = client_jobs.get(client.client_id) + self.assertIsNotNone(jobs) + + for job_id, log_server_addr in jobs.items(): + log_url = "http://{}/get-log".format(log_server_addr) + # Test response without job_id + r = requests.get(log_url) + self.assertEqual(r.status_code, 400) + # Test normal response + r = requests.get(log_url, params={'job_id': job_id}) + self.assertEqual(r.status_code, 200) + log_content = json.loads(r.text).get('log') + self.assertIsNotNone(log_content) + self.assertIn(log_content, outputs) + + # Test download + download_url = "http://{}/download-log".format(log_server_addr) + r = requests.get(download_url, params={'job_id': job_id}) + self.assertEqual(r.status_code, 200) + self.assertIn(r.text, outputs) + + disconnect() + worker.exit() + master.exit() + + def test_monitor_query_log_server(self): + master_port = 8403 + monitor_port = 8404 + # start the master + master = Master(port=master_port, monitor_port=monitor_port) + th = threading.Thread(target=master.run) + th.start() + time.sleep(1) + # start the cluster monitor + monitor_file = __file__.replace('tests/log_server_test.pyc', + 'monitor.py') + monitor_file = monitor_file.replace('tests/log_server_test.py', + 'monitor.py') + command = [ + sys.executable, monitor_file, "--monitor_port", + str(monitor_port), "--address", "localhost:" + str(master_port) + ] + if _IS_WINDOWS: + FNULL = tempfile.TemporaryFile() + else: + FNULL = open(os.devnull, 'w') + monitor_proc = subprocess.Popen( + command, + stdout=FNULL, + stderr=subprocess.STDOUT, + ) + + # Start worker + cluster_addr = 'localhost:{}'.format(master_port) + log_server_port = 8405 + worker = Worker(cluster_addr, 4, log_server_port=log_server_port) + + # Test monitor API + outputs = self._connect_and_create_actor(cluster_addr) + time.sleep(5) # Wait for the status update + client = get_global_client() + jobs_url = "{}/get-jobs?client_id={}".format(master.monitor_url, + client.client_id) + r = requests.get(jobs_url) + self.assertEqual(r.status_code, 200) + data = json.loads(r.text) + for job in data: + log_url = job.get('log_url') + self.assertIsNotNone(log_url) + r = requests.get(log_url) + self.assertEqual(r.status_code, 200) + log_content = json.loads(r.text).get('log') + self.assertIsNotNone(log_content) + self.assertIn(log_content, outputs) + + # Test download + download_url = job.get('download_url') + r = requests.get(download_url) + self.assertEqual(r.status_code, 200) + self.assertIn(r.text, outputs) + + # Clean context + monitor_proc.kill() + monitor_proc.wait() + disconnect() + worker.exit() + master.exit() + + +if __name__ == '__main__': + unittest.main() diff --git a/parl/remote/worker.py b/parl/remote/worker.py index 0046cecfda7d3fe46799c38654bb8edf0e73f3fc..eec5598c6d081ca054541657c61670ecffc70cee 100644 --- a/parl/remote/worker.py +++ b/parl/remote/worker.py @@ -20,6 +20,7 @@ import signal import socket import subprocess import sys +import tempfile import time import threading import warnings @@ -63,7 +64,7 @@ class Worker(object): cpu_num (int): Number of cpu to be used on the worker. """ - def __init__(self, master_address, cpu_num=None): + def __init__(self, master_address, cpu_num=None, log_server_port=None): self.lock = threading.Lock() self.heartbeat_socket_initialized = threading.Event() self.ctx = zmq.Context.instance() @@ -75,6 +76,9 @@ class Worker(object): self._set_cpu_num(cpu_num) self.job_buffer = queue.Queue(maxsize=self.cpu_num) self._create_sockets() + # create log server + self.log_server_proc, self.log_server_address = self._create_log_server( + port=log_server_port) # create a thread that waits commands from the job to kill the job. self.kill_job_thread = threading.Thread(target=self._reply_kill_job) @@ -192,7 +196,8 @@ class Worker(object): job_file = job_file.replace('worker.py', 'job.py') command = [ sys.executable, job_file, "--worker_address", - self.reply_job_address + self.reply_job_address, "--log_server_address", + self.log_server_address ] if sys.version_info.major == 3: @@ -354,10 +359,40 @@ class Worker(object): "[Worker] lost connection with the master, will exit reply heartbeat for master." ) self.worker_status.clear() + self.log_server_proc.kill() + self.log_server_proc.wait() # exit the worker self.worker_is_alive = False self.exit() + def _create_log_server(self, port): + log_server_file = __file__.replace('worker.pyc', 'log_server.py') + log_server_file = log_server_file.replace('worker.py', 'log_server.py') + + if port is None: + port = "0" # `0` means using a random port in flask + command = [ + sys.executable, log_server_file, "--port", + str(port), "--log_dir", "~/.parl_data/job/", "--line_num", "500" + ] + + if sys.version_info.major == 3: + warnings.simplefilter("ignore", ResourceWarning) + + if _IS_WINDOWS: + FNULL = tempfile.TemporaryFile() + else: + FNULL = open(os.devnull, 'w') + log_server_proc = subprocess.Popen( + command, + stdout=FNULL, + stderr=subprocess.STDOUT, + ) + FNULL.close() + + log_server_address = "{}:{}".format(self.worker_ip, port) + return log_server_proc, log_server_address + def exit(self): """close the worker""" self.worker_is_alive = False diff --git a/parl/utils/machine_info.py b/parl/utils/machine_info.py index fefa57317ea6bbf3db983c104e54777eb36099f2..08fae4a853233fe5432a7601834098349c25e420 100644 --- a/parl/utils/machine_info.py +++ b/parl/utils/machine_info.py @@ -14,11 +14,15 @@ import os import platform +import random import socket import subprocess from parl.utils import logger, _HAS_FLUID, _IS_WINDOWS -__all__ = ['get_gpu_count', 'get_ip_address', 'is_gpu_available'] +__all__ = [ + 'get_gpu_count', 'get_ip_address', 'is_gpu_available', 'get_free_tcp_port', + 'is_port_available', 'get_port_from_range' +] def get_ip_address(): @@ -100,3 +104,32 @@ def is_gpu_available(): But PARL found that Paddle was not complied with CUDA, which may cause issues." ) return ret + + +def get_free_tcp_port(): + tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp.bind(('', 0)) + addr, port = tcp.getsockname() + tcp.close() + return str(port) + + +def is_port_available(port): + """ Check if a port is used. + + True if the port is available for connection. + """ + port = int(port) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + available = sock.connect_ex(('localhost', port)) + sock.close() + return available + + +def get_port_from_range(start, end): + while True: + port = random.randint(start, end) + if is_port_available(port): + break + + return port diff --git a/setup.py b/setup.py index 90c6465024884e2dcc9de09a2bf737533cf417e0..56af19b09fe0f7bf658557ee5b8ab4f7e25498ab 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ setup( "flask>=1.0.4", "click", "psutil>=5.6.2", + "flask_cors", "visualdl>=2.0.0b;python_version>='3' and platform_system=='Linux'", ], classifiers=[