未验证 提交 6abe4f69 编写于 作者: Z Zheyue Tan 提交者: GitHub

Add log monitor for remote actors (#269)

* add logger for remote actors

* resolve issues

* `get_free_tcp_port` and `is_port_available` move to utils.py

* add information for user in the remote log web UI

* yapf formatting

* fix unittest error

* yapf formatting

* update remote job web UI

* add option for log server port

* fix windows support

* fix `os.devnull` in windows

* fix port argument passing

* fix bug

* fix kill log_server

* fix log server not started

* add log_server_test
Co-authored-by: NHongsheng Zeng <zenghongsheng@baidu.com>
上级 a9159021
...@@ -59,6 +59,7 @@ class Client(object): ...@@ -59,6 +59,7 @@ class Client(object):
self.heartbeat_socket_initialized = threading.Event() self.heartbeat_socket_initialized = threading.Event()
self.master_is_alive = True self.master_is_alive = True
self.client_is_alive = True self.client_is_alive = True
self.log_monitor_url = None
self.executable_path = self.get_executable_path() self.executable_path = self.get_executable_path()
...@@ -142,14 +143,19 @@ class Client(object): ...@@ -142,14 +143,19 @@ class Client(object):
thread.start() thread.start()
self.heartbeat_socket_initialized.wait() self.heartbeat_socket_initialized.wait()
self.client_id = self.reply_master_heartbeat_address.replace(':', '_') + \
'_' + str(int(time.time()))
# check if the master is connected properly # check if the master is connected properly
try: try:
self.submit_job_socket.send_multipart([ self.submit_job_socket.send_multipart([
remote_constants.CLIENT_CONNECT_TAG, remote_constants.CLIENT_CONNECT_TAG,
to_byte(self.heartbeat_master_address), to_byte(self.reply_master_heartbeat_address),
to_byte(socket.gethostname()) to_byte(socket.gethostname()),
to_byte(self.client_id),
]) ])
_ = self.submit_job_socket.recv_multipart() message = self.submit_job_socket.recv_multipart()
self.log_monitor_url = to_str(message[1])
except zmq.error.Again as e: except zmq.error.Again as e:
logger.warning("[Client] Can not connect to the master, please " logger.warning("[Client] Can not connect to the master, please "
"check if master is started and ensure the input " "check if master is started and ensure the input "
...@@ -160,16 +166,16 @@ class Client(object): ...@@ -160,16 +166,16 @@ class Client(object):
"address {} is correct.".format(master_address)) "address {} is correct.".format(master_address))
def _reply_heartbeat(self): def _reply_heartbeat(self):
"""Reply heartbeat signals to the specific node.""" """Reply heartbeat signals to the master node."""
socket = self.ctx.socket(zmq.REP) socket = self.ctx.socket(zmq.REP)
socket.linger = 0 socket.linger = 0
socket.setsockopt(zmq.RCVTIMEO, socket.setsockopt(zmq.RCVTIMEO,
remote_constants.HEARTBEAT_RCVTIMEO_S * 1000) remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
heartbeat_master_port =\ reply_master_heartbeat_port =\
socket.bind_to_random_port(addr="tcp://*") socket.bind_to_random_port(addr="tcp://*")
self.heartbeat_master_address = "{}:{}".format(get_ip_address(), self.reply_master_heartbeat_address = "{}:{}".format(
heartbeat_master_port) get_ip_address(), reply_master_heartbeat_port)
self.heartbeat_socket_initialized.set() self.heartbeat_socket_initialized.set()
connected = False connected = False
while self.client_is_alive and self.master_is_alive: while self.client_is_alive and self.master_is_alive:
...@@ -181,9 +187,9 @@ class Client(object): ...@@ -181,9 +187,9 @@ class Client(object):
remote_constants.HEARTBEAT_TAG, remote_constants.HEARTBEAT_TAG,
to_byte(self.executable_path), to_byte(self.executable_path),
to_byte(str(self.actor_num)), to_byte(str(self.actor_num)),
to_byte(str(elapsed_time)) to_byte(str(elapsed_time)),
]) to_byte(str(self.log_monitor_url)),
connected = True ]) # TODO: remove additional information
except zmq.error.Again as e: except zmq.error.Again as e:
if connected: if connected:
logger.warning("[Client] Cannot connect to the master." logger.warning("[Client] Cannot connect to the master."
...@@ -200,7 +206,7 @@ class Client(object): ...@@ -200,7 +206,7 @@ class Client(object):
def _check_and_monitor_job(self, job_heartbeat_address, def _check_and_monitor_job(self, job_heartbeat_address,
ping_heartbeat_address, max_memory): ping_heartbeat_address, max_memory):
""" Sometimes the client may receive a job that is dead, thus """ Sometimes the client may receive a job that is dead, thus
we have to check if this job is still alive before sending it to the actor. we have to check if this job is still alive before adding it to the `actor_num`.
""" """
# job_heartbeat_socket: sends heartbeat signal to job # job_heartbeat_socket: sends heartbeat signal to job
job_heartbeat_socket = self.ctx.socket(zmq.REQ) job_heartbeat_socket = self.ctx.socket(zmq.REQ)
...@@ -289,7 +295,8 @@ class Client(object): ...@@ -289,7 +295,8 @@ class Client(object):
self.lock.acquire() self.lock.acquire()
self.submit_job_socket.send_multipart([ self.submit_job_socket.send_multipart([
remote_constants.CLIENT_SUBMIT_TAG, remote_constants.CLIENT_SUBMIT_TAG,
to_byte(self.heartbeat_master_address) to_byte(self.reply_master_heartbeat_address),
to_byte(self.client_id),
]) ])
message = self.submit_job_socket.recv_multipart() message = self.submit_job_socket.recv_multipart()
self.lock.release() self.lock.release()
...@@ -356,6 +363,8 @@ def connect(master_address, distributed_files=[]): ...@@ -356,6 +363,8 @@ def connect(master_address, distributed_files=[]):
if GLOBAL_CLIENT.process_id != cur_process_id: if GLOBAL_CLIENT.process_id != cur_process_id:
GLOBAL_CLIENT = Client(master_address, cur_process_id, GLOBAL_CLIENT = Client(master_address, cur_process_id,
distributed_files) distributed_files)
logger.info("Remote actors log url: {}".format(
GLOBAL_CLIENT.log_monitor_url))
def get_global_client(): def get_global_client():
......
...@@ -28,7 +28,8 @@ class ClusterMonitor(object): ...@@ -28,7 +28,8 @@ class ClusterMonitor(object):
def __init__(self): def __init__(self):
self.status = { self.status = {
'workers': defaultdict(dict), 'workers': defaultdict(dict),
'clients': defaultdict(dict) 'clients': defaultdict(dict),
'client_jobs': defaultdict(dict),
} }
self.lock = threading.Lock() self.lock = threading.Lock()
...@@ -46,6 +47,11 @@ class ClusterMonitor(object): ...@@ -46,6 +47,11 @@ class ClusterMonitor(object):
worker_status['hostname'] = hostname worker_status['hostname'] = hostname
self.lock.release() self.lock.release()
def add_client_job(self, client_id, job_info):
self.lock.acquire()
self.status['client_jobs'][client_id].update(job_info)
self.lock.release()
def update_client_status(self, client_status, client_address, def update_client_status(self, client_status, client_address,
client_hostname): client_hostname):
"""Update client status with message send from client heartbeat. """Update client status with message send from client heartbeat.
...@@ -61,7 +67,8 @@ class ClusterMonitor(object): ...@@ -61,7 +67,8 @@ class ClusterMonitor(object):
'client_address': client_hostname, 'client_address': client_hostname,
'file_path': to_str(client_status[1]), 'file_path': to_str(client_status[1]),
'actor_num': int(to_str(client_status[2])), 'actor_num': int(to_str(client_status[2])),
'time': to_str(client_status[3]) 'time': to_str(client_status[3]),
'log_monitor_url': to_str(client_status[4]),
} }
self.lock.release() self.lock.release()
...@@ -96,11 +103,11 @@ class ClusterMonitor(object): ...@@ -96,11 +103,11 @@ class ClusterMonitor(object):
self.status['workers'].pop(worker_address) self.status['workers'].pop(worker_address)
self.lock.release() self.lock.release()
def drop_cluster_status(self, client_address): def drop_client_status(self, client_address):
"""Drop cluster status when it exits. """Drop client status when it exits.
Args: Args:
cluster_address (str): IP address of the exited client. client_address (str): IP address of the exited client.
""" """
self.lock.acquire() self.lock.acquire()
if client_address in self.status['clients']: if client_address in self.status['clients']:
......
...@@ -48,7 +48,7 @@ class Job(object): ...@@ -48,7 +48,7 @@ class Job(object):
""" """
def __init__(self, worker_address): def __init__(self, worker_address, log_server_address):
""" """
Args: Args:
worker_address(str): worker_address for sending job information(e.g, pid) worker_address(str): worker_address for sending job information(e.g, pid)
...@@ -60,13 +60,15 @@ class Job(object): ...@@ -60,13 +60,15 @@ class Job(object):
self.max_memory = None self.max_memory = None
self.job_address_receiver, job_address_sender = Pipe() self.job_address_receiver, job_address_sender = Pipe()
self.job_id_receiver, job_id_sender = Pipe()
self.worker_address = worker_address self.worker_address = worker_address
self.log_server_address = log_server_address
self.job_ip = get_ip_address() self.job_ip = get_ip_address()
self.pid = os.getpid() self.pid = os.getpid()
self.run_job_process = Process( self.run_job_process = Process(
target=self.run, args=(job_address_sender, )) target=self.run, args=(job_address_sender, job_id_sender))
self.run_job_process.start() self.run_job_process.start()
""" """
NOTE: NOTE:
...@@ -102,6 +104,7 @@ class Job(object): ...@@ -102,6 +104,7 @@ class Job(object):
""" """
# wait for another process to create reply socket # wait for another process to create reply socket
self.job_address = self.job_address_receiver.recv() self.job_address = self.job_address_receiver.recv()
self.job_id = self.job_id_receiver.recv()
self.ctx = zmq.Context() self.ctx = zmq.Context()
# create the job_socket # create the job_socket
...@@ -135,7 +138,8 @@ class Job(object): ...@@ -135,7 +138,8 @@ class Job(object):
# sends job information to the worker # sends job information to the worker
initialized_job = InitializedJob( initialized_job = InitializedJob(
self.job_address, worker_heartbeat_address, self.job_address, worker_heartbeat_address,
client_heartbeat_address, ping_heartbeat_address, None, self.pid) client_heartbeat_address, ping_heartbeat_address, None, self.pid,
self.job_id, self.log_server_address)
self.job_socket.send_multipart( self.job_socket.send_multipart(
[remote_constants.NORMAL_TAG, [remote_constants.NORMAL_TAG,
cloudpickle.dumps(initialized_job)]) cloudpickle.dumps(initialized_job)])
...@@ -332,7 +336,7 @@ class Job(object): ...@@ -332,7 +336,7 @@ class Job(object):
return obj return obj
def run(self, job_address_sender): def run(self, job_address_sender, job_id_sender):
"""An infinite loop waiting for a new task. """An infinite loop waiting for a new task.
Args: Args:
...@@ -347,7 +351,15 @@ class Job(object): ...@@ -347,7 +351,15 @@ class Job(object):
job_ip = get_ip_address() job_ip = get_ip_address()
job_address = "{}:{}".format(job_ip, job_port) job_address = "{}:{}".format(job_ip, job_port)
job_id = job_address.replace(':', '_') + '_' + str(int(time.time()))
self.log_dir = os.path.expanduser('~/.parl_data/job/{}'.format(job_id))
logger.set_dir(self.log_dir)
logger.info(
"[Job] Job {} initialized. Reply heartbeat socket Address: {}.".
format(job_id, job_address))
job_address_sender.send(job_address) job_address_sender.send(job_address)
job_id_sender.send(job_id)
try: try:
# receive source code from the actor and append them to the environment variables. # receive source code from the actor and append them to the environment variables.
...@@ -391,7 +403,15 @@ class Job(object): ...@@ -391,7 +403,15 @@ class Job(object):
function_name = to_str(message[1]) function_name = to_str(message[1])
data = message[2] data = message[2]
args, kwargs = loads_argument(data) args, kwargs = loads_argument(data)
ret = getattr(obj, function_name)(*args, **kwargs)
# Redirect stdout to stdout.log temporarily
logfile_path = os.path.join(self.log_dir, 'stdout.log')
with open(logfile_path, 'a') as f:
tmp = sys.stdout
sys.stdout = f
ret = getattr(obj, function_name)(*args, **kwargs)
sys.stdout = tmp
ret = dumps_return(ret) ret = dumps_return(ret)
reply_socket.send_multipart( reply_socket.send_multipart(
...@@ -450,5 +470,10 @@ if __name__ == "__main__": ...@@ -450,5 +470,10 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
"--worker_address", required=True, type=str, help="worker_address") "--worker_address", required=True, type=str, help="worker_address")
parser.add_argument(
"--log_server_address",
required=True,
type=str,
help="log_server_address, address of the log web server on worker")
args = parser.parse_args() args = parser.parse_args()
job = Job(args.worker_address) job = Job(args.worker_address, args.log_server_address)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import linecache
import os
from flask import Flask, current_app, jsonify, make_response, request, send_file
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
@app.route(
"/get-log", methods=[
'GET',
])
def get_log():
'''
args:
job_id: id of the remote job
response:
log: newest `LINE_NUM` lines of the log file
'''
try:
job_id = request.args['job_id']
except:
return make_response(
jsonify(message="No job_id provided, please check your request."),
400)
log_dir = current_app.config.get('LOG_DIR')
log_dir = os.path.expanduser(log_dir)
log_file_path = os.path.join(log_dir, job_id, 'stdout.log')
if not os.path.isfile(log_file_path):
return make_response(
jsonify(message="Log not exsits, please check your job_id"), 400)
else:
line_num = current_app.config.get('LINE_NUM')
linecache.checkcache(log_file_path)
log_content = ''.join(linecache.getlines(log_file_path)[-line_num:])
return make_response(
jsonify(message="Log exsits, content in log", log=log_content),
200)
@app.route(
'/download-log', methods=[
'GET',
])
def download_log():
'''
args:
job_id: the id of the remote job
response:
log: log file
'''
try:
job_id = request.args['job_id']
except:
return make_response(
jsonify(message="No job_id provided, please check your request."),
400)
log_dir = current_app.config.get('LOG_DIR')
log_dir = os.path.expanduser(log_dir)
log_file_path = os.path.join(log_dir, job_id, 'stdout.log')
if not os.path.isfile(log_file_path):
return make_response(
jsonify(message="Log not exsits, please check your job_id"), 400)
else:
return send_file(log_file_path, as_attachment=True)
if __name__ == "__main__":
import logging
log = logging.getLogger('werkzeug')
log.disabled = True
parser = argparse.ArgumentParser()
parser.add_argument('--port', required=True, type=int)
parser.add_argument('--log_dir', required=True, type=str)
parser.add_argument('--line_num', required=True, type=int)
args = parser.parse_args()
app.config.from_mapping(
LOG_DIR=args.log_dir,
LINE_NUM=args.line_num,
)
app.run(host="0.0.0.0", port=args.port)
...@@ -57,9 +57,10 @@ class Master(object): ...@@ -57,9 +57,10 @@ class Master(object):
port: The ip port that the master node binds to. port: The ip port that the master node binds to.
""" """
def __init__(self, port): def __init__(self, port, monitor_port=None):
self.ctx = zmq.Context() self.ctx = zmq.Context()
self.master_ip = get_ip_address() self.master_ip = get_ip_address()
self.monitor_url = "http://{}:{}".format(self.master_ip, monitor_port)
logger.set_dir( logger.set_dir(
os.path.expanduser('~/.parl_data/master/{}_{}'.format( os.path.expanduser('~/.parl_data/master/{}_{}'.format(
self.master_ip, port))) self.master_ip, port)))
...@@ -135,7 +136,7 @@ class Master(object): ...@@ -135,7 +136,7 @@ class Master(object):
except zmq.error.Again as e: except zmq.error.Again as e:
client_is_alive = False client_is_alive = False
self.cluster_monitor.drop_cluster_status( self.cluster_monitor.drop_client_status(
client_heartbeat_address) client_heartbeat_address)
logger.warning("[Master] cannot connect to the client " + logger.warning("[Master] cannot connect to the client " +
"{}. ".format(client_heartbeat_address) + "{}. ".format(client_heartbeat_address) +
...@@ -205,8 +206,11 @@ class Master(object): ...@@ -205,8 +206,11 @@ class Master(object):
# a client connects to the master # a client connects to the master
elif tag == remote_constants.CLIENT_CONNECT_TAG: elif tag == remote_constants.CLIENT_CONNECT_TAG:
# `client_heartbeat_address` is the
# `reply_master_heartbeat_address` of the client
client_heartbeat_address = to_str(message[1]) client_heartbeat_address = to_str(message[1])
client_hostname = to_str(message[2]) client_hostname = to_str(message[2])
client_id = to_str(message[3])
self.client_hostname[client_heartbeat_address] = client_hostname self.client_hostname[client_heartbeat_address] = client_hostname
logger.info( logger.info(
"Client {} is connected.".format(client_heartbeat_address)) "Client {} is connected.".format(client_heartbeat_address))
...@@ -215,11 +219,14 @@ class Master(object): ...@@ -215,11 +219,14 @@ class Master(object):
target=self._create_client_monitor, target=self._create_client_monitor,
args=(client_heartbeat_address, )) args=(client_heartbeat_address, ))
thread.start() thread.start()
self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) log_monitor_address = "{}/logs?client_id={}".format(
self.monitor_url, client_id)
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(log_monitor_address)])
# a client submits a job to the master # a client submits a job to the master
elif tag == remote_constants.CLIENT_SUBMIT_TAG: elif tag == remote_constants.CLIENT_SUBMIT_TAG:
# check available CPU resources # check available CPU resources
if self.cpu_num: if self.cpu_num:
logger.info("Submitting job...") logger.info("Submitting job...")
...@@ -230,6 +237,9 @@ class Master(object): ...@@ -230,6 +237,9 @@ class Master(object):
to_byte(job.client_heartbeat_address), to_byte(job.client_heartbeat_address),
to_byte(job.ping_heartbeat_address), to_byte(job.ping_heartbeat_address),
]) ])
client_id = to_str(message[2])
job_info = {job.job_id: job.log_server_address}
self.cluster_monitor.add_client_job(client_id, job_info)
self._print_workers() self._print_workers()
else: else:
self.client_socket.send_multipart([remote_constants.CPU_TAG]) self.client_socket.send_multipart([remote_constants.CPU_TAG])
......
...@@ -14,9 +14,15 @@ ...@@ -14,9 +14,15 @@
class InitializedJob(object): class InitializedJob(object):
def __init__(self, job_address, worker_heartbeat_address, def __init__(self,
client_heartbeat_address, ping_heartbeat_address, job_address,
worker_address, pid): worker_heartbeat_address,
client_heartbeat_address,
ping_heartbeat_address,
worker_address,
pid,
job_id=None,
log_server_address=None):
""" """
Args: Args:
job_address(str): Job address to which the new task connect. job_address(str): Job address to which the new task connect.
...@@ -35,6 +41,8 @@ class InitializedJob(object): ...@@ -35,6 +41,8 @@ class InitializedJob(object):
self.worker_address = worker_address self.worker_address = worker_address
self.pid = pid self.pid = pid
self.is_alive = True self.is_alive = True
self.job_id = job_id
self.log_server_address = log_server_address
class InitializedWorker(object): class InitializedWorker(object):
......
...@@ -19,7 +19,7 @@ import time ...@@ -19,7 +19,7 @@ import time
import zmq import zmq
import threading import threading
from flask import Flask, render_template, jsonify from flask import Flask, render_template, jsonify, request
app = Flask(__name__) app = Flask(__name__)
...@@ -81,6 +81,7 @@ class ClusterMonitor(object): ...@@ -81,6 +81,7 @@ class ClusterMonitor(object):
data['total_vacant_cpus'] = total_vacant_cpus data['total_vacant_cpus'] = total_vacant_cpus
data['total_cpus'] = total_used_cpus + total_vacant_cpus data['total_cpus'] = total_used_cpus + total_vacant_cpus
data['clients'] = list(status['clients'].values()) data['clients'] = list(status['clients'].values())
data['client_jobs'] = status['client_jobs']
self.data = data self.data = data
time.sleep(10) time.sleep(10)
...@@ -99,6 +100,39 @@ def cluster(): ...@@ -99,6 +100,39 @@ def cluster():
return jsonify(data) return jsonify(data)
@app.route(
'/logs', methods=[
'GET',
])
def logs():
client_id = request.args.get('client_id')
return render_template('jobs.html', client_id=client_id)
@app.route(
'/get-jobs', methods=[
'GET',
])
def get_jobs():
client_id = request.args.get('client_id')
jobs = CLUSTER_MONITOR.get_data()['client_jobs'].get(client_id)
data = []
if jobs:
for idx, job_id in enumerate(jobs):
monitor_url = jobs[job_id]
data.append({
"id":
idx,
"job_id":
job_id,
"log_url":
"http://{}/get-log?job_id={}".format(monitor_url, job_id),
"download_url":
"http://{}/download-log?job_id={}".format(monitor_url, job_id),
})
return jsonify(data)
if __name__ == "__main__": if __name__ == "__main__":
import logging import logging
log = logging.getLogger('werkzeug') log = logging.getLogger('werkzeug')
......
...@@ -18,7 +18,7 @@ import multiprocessing ...@@ -18,7 +18,7 @@ import multiprocessing
import os import os
import random import random
import re import re
import socket import requests
import subprocess import subprocess
import sys import sys
import time import time
...@@ -27,7 +27,9 @@ import tempfile ...@@ -27,7 +27,9 @@ import tempfile
import warnings import warnings
import zmq import zmq
from multiprocessing import Process from multiprocessing import Process
from parl.utils import get_ip_address, to_str, _IS_WINDOWS, kill_process from parl.utils import (_IS_WINDOWS, get_free_tcp_port, get_ip_address,
get_port_from_range, is_port_available, kill_process,
to_str)
from parl.remote.remote_constants import STATUS_TAG from parl.remote.remote_constants import STATUS_TAG
# A flag to mark if parl is started from a command line # A flag to mark if parl is started from a command line
...@@ -47,26 +49,6 @@ if sys.version_info.major == 3: ...@@ -47,26 +49,6 @@ if sys.version_info.major == 3:
warnings.simplefilter("ignore", ResourceWarning) warnings.simplefilter("ignore", ResourceWarning)
def get_free_tcp_port():
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(('', 0))
addr, port = tcp.getsockname()
tcp.close()
return str(port)
def is_port_available(port):
""" Check if a port is used.
True if the port is available for connection.
"""
port = int(port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
available = sock.connect_ex(('localhost', port))
sock.close()
return available
def is_master_started(address): def is_master_started(address):
ctx = zmq.Context() ctx = zmq.Context()
socket = ctx.socket(zmq.REQ) socket = ctx.socket(zmq.REQ)
...@@ -83,6 +65,33 @@ def is_master_started(address): ...@@ -83,6 +65,33 @@ def is_master_started(address):
return False return False
def parse_port_range(log_server_port_range):
try:
re.match(r'\d*[-]\d*', log_server_port_range).span()
except:
raise Exception(
"The input log_server_port_range should be `start-end` format.")
start, end = map(int, log_server_port_range.split('-'))
if start > end:
raise Exception(
"Start port number must be smaller than the end port number.")
return start, end
def is_log_server_started(ip_address, port):
started = False
for _ in range(3):
try:
r = requests.get("http://{}:{}/get-log".format(ip_address, port))
if r.status_code == 400:
started = True
break
except:
time.sleep(3)
return started
@click.group() @click.group()
def cli(): def cli():
pass pass
...@@ -101,7 +110,15 @@ def cli(): ...@@ -101,7 +110,15 @@ def cli():
"cpus of this machine.") "cpus of this machine.")
@click.option( @click.option(
"--monitor_port", help="The port to start a cluster monitor.", type=str) "--monitor_port", help="The port to start a cluster monitor.", type=str)
def start_master(port, cpu_num, monitor_port, debug): @click.option(
"--log_server_port_range",
help='''
Port range (start-end) of the log server on the worker. Default: 8000-9000.
The worker will pick a random avaliable port in [start, end] for the log server.
''',
default="8000-9000",
type=str)
def start_master(port, cpu_num, monitor_port, debug, log_server_port_range):
if debug: if debug:
os.environ['DEBUG'] = 'True' os.environ['DEBUG'] = 'True'
...@@ -122,14 +139,26 @@ def start_master(port, cpu_num, monitor_port, debug): ...@@ -122,14 +139,26 @@ def start_master(port, cpu_num, monitor_port, debug):
monitor_file = monitor_file.replace('scripts.py', 'monitor.py') monitor_file = monitor_file.replace('scripts.py', 'monitor.py')
monitor_port = monitor_port if monitor_port else get_free_tcp_port() monitor_port = monitor_port if monitor_port else get_free_tcp_port()
start, end = parse_port_range(log_server_port_range)
log_server_port = get_port_from_range(start, end)
while log_server_port == monitor_port or log_server_port == port:
log_server_port = get_port_from_range(start, end)
master_command = [ master_command = [
sys.executable, start_file, "--name", "master", "--port", port sys.executable,
start_file,
"--name",
"master",
"--port",
port,
"--monitor_port",
monitor_port,
] ]
worker_command = [ worker_command = [
sys.executable, start_file, "--name", "worker", "--address", sys.executable, start_file, "--name", "worker", "--address",
"localhost:" + str(port), "--cpu_num", "localhost:" + str(port), "--cpu_num",
str(cpu_num) str(cpu_num), '--log_server_port',
str(log_server_port)
] ]
monitor_command = [ monitor_command = [
sys.executable, monitor_file, "--monitor_port", sys.executable, monitor_file, "--monitor_port",
...@@ -216,6 +245,9 @@ def start_master(port, cpu_num, monitor_port, debug): ...@@ -216,6 +245,9 @@ def start_master(port, cpu_num, monitor_port, debug):
""".format(start_info, master_ip, port) """.format(start_info, master_ip, port)
click.echo(monitor_info) click.echo(monitor_info)
if not is_log_server_started(master_ip, log_server_port):
click.echo("# Fail to start the log server.")
@click.command("connect", short_help="Start a worker node.") @click.command("connect", short_help="Start a worker node.")
@click.option( @click.option(
...@@ -225,7 +257,18 @@ def start_master(port, cpu_num, monitor_port, debug): ...@@ -225,7 +257,18 @@ def start_master(port, cpu_num, monitor_port, debug):
type=int, type=int,
help="Set number of cpu manually. If not set, it will use all " help="Set number of cpu manually. If not set, it will use all "
"cpus of this machine.") "cpus of this machine.")
def start_worker(address, cpu_num): @click.option(
"--log_server_port_range",
help='''
Port range (start-end) of the log server on the worker. Default: 8000-9000.
The worker will pick a random avaliable port in [start, end] for the log server.
''',
default="8000-9000",
type=str)
def start_worker(address, cpu_num, log_server_port_range):
start, end = parse_port_range(log_server_port_range)
log_server_port = get_port_from_range(start, end)
if not is_master_started(address): if not is_master_started(address):
raise Exception("Worker can not connect to the master node, " + raise Exception("Worker can not connect to the master node, " +
"please check if the input address {} ".format( "please check if the input address {} ".format(
...@@ -237,16 +280,21 @@ def start_worker(address, cpu_num): ...@@ -237,16 +280,21 @@ def start_worker(address, cpu_num):
command = [ command = [
sys.executable, start_file, "--name", "worker", "--address", address, sys.executable, start_file, "--name", "worker", "--address", address,
"--cpu_num", "--cpu_num",
str(cpu_num) str(cpu_num), "--log_server_port",
str(log_server_port)
] ]
p = subprocess.Popen(command) p = subprocess.Popen(command)
if not is_log_server_started(get_ip_address(), log_server_port):
click.echo("# Fail to start the log server.")
@click.command("stop", help="Exit the cluster.") @click.command("stop", help="Exit the cluster.")
def stop(): def stop():
kill_process('remote/start.py') kill_process('remote/start.py')
kill_process('remote/job.py') kill_process('remote/job.py')
kill_process('remote/monitor.py') kill_process('remote/monitor.py')
kill_process('remote/log_server.py')
@click.command("status") @click.command("status")
......
...@@ -28,13 +28,15 @@ def main(args): ...@@ -28,13 +28,15 @@ def main(args):
if args.name == 'master': if args.name == 'master':
port = args.port port = args.port
master = Master(port) monitor_port = args.monitor_port
master = Master(port, monitor_port)
master.run() master.run()
elif args.name == 'worker': elif args.name == 'worker':
address = args.address address = args.address
log_server_port = args.log_server_port
cpu_num = int(args.cpu_num) if args.cpu_num else None cpu_num = int(args.cpu_num) if args.cpu_num else None
worker = Worker(address, cpu_num) worker = Worker(address, cpu_num, log_server_port)
worker.run() worker.run()
else: else:
...@@ -48,5 +50,7 @@ if __name__ == "__main__": ...@@ -48,5 +50,7 @@ if __name__ == "__main__":
parser.add_argument('--port', default='1234', type=str) parser.add_argument('--port', default='1234', type=str)
parser.add_argument('--address', default='localhost:1234', type=str) parser.add_argument('--address', default='localhost:1234', type=str)
parser.add_argument('--cpu_num', default='', type=str) parser.add_argument('--cpu_num', default='', type=str)
parser.add_argument('--monitor_port', default='', type=str)
parser.add_argument('--log_server_port', default='', type=str)
args = parser.parse_args() args = parser.parse_args()
main(args) main(args)
此差异已折叠。
/*
jQuery AJAX Cross Origin v1.3 (http://www.ajax-cross-origin.com)
jQuery plugin to bypass Same-origin_policy using Google Apps Script.
references:
http://en.wikipedia.org/wiki/Same-origin_policy
http://www.google.com/script/start/
(c) 2014, Writen by Erez Ninio. site: www.dealhotelbook.com
Licensed under the Creative Commons Attribution 3.0 Unported License.
For details, see http://creativecommons.org/licenses/by/3.0/.
*/
var proxyJsonp =
"https://script.google.com/macros/s/AKfycbwmqG55tt2d2FcT_WQ3WjCSKmtyFpkOcdprSITn45-4UgVJnzp9/exec";
jQuery.ajaxOrig = jQuery.ajax;
jQuery.ajax = function (a, b) {
function d(a) {
a = encodeURI(a).replace(/&/g, "%26");
return proxyJsonp + "?url=" + a + "&callback=?";
}
var c = "object" === typeof a ? a : b || {};
c.url = c.url || ("string" === typeof a ? a : "");
var c = jQuery.ajaxSetup({}, c),
e = (function (a, c) {
var b = document.createElement("a");
b.href = a;
return (
c.crossOrigin &&
"http" == a.substr(0, 4).toLowerCase() &&
"localhost" != b.hostname &&
"127.0.0.1" != b.hostname &&
b.hostname != window.location.hostname
);
})(c.url, c);
c.proxy &&
0 < c.proxy.length &&
((proxyJsonp = c.proxy),
"object" === typeof a
? (a.crossDomain = !0)
: "object" === typeof b && (b.crossDomain = !0));
e &&
("object" === typeof a
? a.url &&
((a.url = d(a.url)),
a.charset && (a.url += "&charset=" + a.charset),
(a.dataType = "json"))
: "string" === typeof a &&
"object" === typeof b &&
((a = d(a)),
b.charset && (a += "&charset=" + b.charset),
(b.dataType = "json")));
return jQuery.ajaxOrig.apply(this, arguments);
};
jQuery.ajax.prototype = new jQuery.ajaxOrig();
jQuery.ajax.prototype.constructor = jQuery.ajax;
...@@ -185,7 +185,8 @@ function autoTable(res) { ...@@ -185,7 +185,8 @@ function autoTable(res) {
var s3 = `<td>${res.clients[i].client_address}</td>`; var s3 = `<td>${res.clients[i].client_address}</td>`;
var s4 = `<td>${res.clients[i].actor_num}</td>`; var s4 = `<td>${res.clients[i].actor_num}</td>`;
var s5 = `<td>${res.clients[i].time}</td>`; var s5 = `<td>${res.clients[i].time}</td>`;
tr.innerHTML = s1 + s2 + s3 + s4 + s5; var s6 = `<td><a href=${res.clients[i].log_monitor_url}>link</a></td>`;
tr.innerHTML = s1 + s2 + s3 + s4 + s5 + s6;
table.appendChild(tr); table.appendChild(tr);
} }
}; };
...@@ -43,10 +43,11 @@ ...@@ -43,10 +43,11 @@
<th scope="col">Hostname</th> <th scope="col">Hostname</th>
<th scope="col">Actor Num</th> <th scope="col">Actor Num</th>
<th scope="col">Time (min)</th> <th scope="col">Time (min)</th>
<th scope="col">Log</th>
</tr> </tr>
</thead> </thead>
<tbody id='table'> <tbody id='table'>
<th colspan="5">Loading Data...</th> <th colspan="6">Loading Data...</th>
</tbody> </tbody>
</table> </table>
</div> </div>
......
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>Parl Cluster</title>
<link rel="shortcut icon" href="../static/favicon.ico" />
<script type="text/javascript" src="../static/js/jquery.min.js"></script>
<script type="text/javascript" src="../static/js/ansi_up.js"></script>
<script
type="text/javascript"
src="../static/js/bootstrap-table.min.js"
></script>
<script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/4.5.0/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" href="../static/css/bootstrap-parl.min.css" />
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-dark fixed-top">
<div class="container">
<a class="navbar-brand">
<img src="../static/logo.png" style="height: 30px;" />
</a>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item" id="worker_nav">
<a class="btn text-white" href="workers">Worker</a>
</li>
<li class="nav-item" id="client_nav">
<a class="btn text-white" href="clients">Client</a>
</li>
</ul>
</div>
</div>
</nav>
<div class="container" id="main-container">
<h5 class="font-weight-light text-center text-lg-left mt-4 mb-4">
Jobs Monitor
</h5>
<div class="card">
<div class="card-header" style="display: inline;">
<h3 style="display: inline;">
Remote Job Log
</h3>
<p
style="
float: right;
margin-bottom: 0rem;
position: relative;
bottom: -0.5rem;
"
>
Client ID: {{ client_id }}
</p>
</div>
</div>
<table id="table"></table>
</div>
<!-- Modal -->
<div
class="modal fade"
id="log-modal"
tabindex="-1"
role="dialog"
aria-hidden="true"
>
<div
class="modal-dialog modal-lg modal-dialog-scrollable"
role="document"
>
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title" id="log-modal-title">
Job ID:
</h5>
<button
type="button"
class="close"
data-dismiss="modal"
aria-label="Close"
></button>
</div>
<div class="modal-body">
<p id="log-content">
<div id="loading-spin" class="spinner-border text-primary" role="status">
<span class="sr-only">Loading...</span>
</div>
</p>
</div>
<div class="modal-footer">
<a style="position: relative; left: -160px; font-size: small;">
* Only the lastest 500 lines of the log are shown, <br />
download the log file for the full log.
</a>
<button
type="button"
class="btn btn-secondary"
data-dismiss="modal"
>
Close
</button>
<a
role="button"
id="download-btn"
type="button"
class="btn btn-primary"
href=""
>
Download the complete log
</a>
</div>
</div>
</div>
</div>
<script>
let client_id = "{{ client_id }}";
let $table = $("#table");
function initTable() {
$table.bootstrapTable({
url: "get-jobs?client_id={{ client_id }}",
pagination: true,
pageSize: 10,
pageList: [10, 25, 50, 100],
columns: [
{
field: "id",
title: "ID",
},
{
field: "job_id",
title: "Job ID",
},
{
field: "log_url",
title: "Log",
formatter: urlButtonFormatter,
},
{
field: "download_url",
title: "Download",
formatter: downloadButtonFormatter,
},
],
});
}
function urlButtonFormatter(value, row, index) {
let job_id = value.split("?job_id=")[1];
return `<a
role="button"
data-toggle="modal"
data-target="#log-modal"
data-job-url="${value}"
data-job-id="${job_id}"
class="btn btn-sm btn-outline-primary"
href="">view</a>`;
}
function downloadButtonFormatter(value, row, index) {
return `<a
role="button"
class="btn btn-sm btn-outline-primary"
href=${value}>link</a>`;
}
let refresher_id = null;
let ansi_up = new AnsiUp();
$("#log-modal").on("show.bs.modal", function (e) {
let job_id = $(e.relatedTarget).data("job-id");
let job_url = $(e.relatedTarget).data("job-url");
$("#log-modal-title").text("Job ID: " + job_id);
$("#download-btn").attr("href", job_url.replace("get", "download"));
refresher_id = setInterval(() => {
$.get(job_url, function (data, status) {
html = ansi_up
.ansi_to_html(data.log)
.replace(/\r\n/g, "<br>")
.replace(/\n/g, "<br>");
$("#loading-spin").hide();
$("#log-content").html(html);
});
}, 1000);
});
$("#log-modal").on("hide.bs.modal", function (e) {
clearInterval(refresher_id);
});
$(document).ready(initTable);
setInterval(() => {
$table.bootstrapTable("refresh");
}, 10000);
</script>
</body>
</html>
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import multiprocessing
import os
import pickle
import subprocess
import sys
import tempfile
import threading
import time
import unittest
import requests
import parl
from parl.remote.client import disconnect, get_global_client
from parl.remote.master import Master
from parl.remote.worker import Worker
from parl.utils import _IS_WINDOWS, get_free_tcp_port
@parl.remote_class
class Actor(object):
def __init__(self, number=None, arg1=None, arg2=None):
self.number = number
self.arg1 = arg1
self.arg2 = arg2
def sim_output(self, start, end):
output = ""
print(self.number)
output += str(self.number)
output += "\n"
for i in range(start, end):
print(i)
output += str(i)
output += "\n"
return output
class TestLogServer(unittest.TestCase):
def tearDown(self):
disconnect()
#In windows, multiprocessing.Process cannot run the method of class, but static method is ok.
@staticmethod
def _connect_and_create_actor(cluster_addr):
parl.connect(cluster_addr)
outputs = []
for i in range(2):
actor = Actor(number=i)
ret = actor.sim_output(1, 4)
assert ret != ""
outputs.append(ret)
return outputs
def test_log_server(self):
master_port = 8401
# start the master
master = Master(port=master_port)
th = threading.Thread(target=master.run)
th.start()
time.sleep(1)
cluster_addr = 'localhost:{}'.format(master_port)
log_server_port = 8402
worker = Worker(cluster_addr, 4, log_server_port=log_server_port)
outputs = self._connect_and_create_actor(cluster_addr)
# Get status
status = master._get_status()
client_jobs = pickle.loads(status).get('client_jobs')
self.assertIsNotNone(client_jobs)
# Get job id
client = get_global_client()
jobs = client_jobs.get(client.client_id)
self.assertIsNotNone(jobs)
for job_id, log_server_addr in jobs.items():
log_url = "http://{}/get-log".format(log_server_addr)
# Test response without job_id
r = requests.get(log_url)
self.assertEqual(r.status_code, 400)
# Test normal response
r = requests.get(log_url, params={'job_id': job_id})
self.assertEqual(r.status_code, 200)
log_content = json.loads(r.text).get('log')
self.assertIsNotNone(log_content)
self.assertIn(log_content, outputs)
# Test download
download_url = "http://{}/download-log".format(log_server_addr)
r = requests.get(download_url, params={'job_id': job_id})
self.assertEqual(r.status_code, 200)
self.assertIn(r.text, outputs)
disconnect()
worker.exit()
master.exit()
def test_monitor_query_log_server(self):
master_port = 8403
monitor_port = 8404
# start the master
master = Master(port=master_port, monitor_port=monitor_port)
th = threading.Thread(target=master.run)
th.start()
time.sleep(1)
# start the cluster monitor
monitor_file = __file__.replace('tests/log_server_test.pyc',
'monitor.py')
monitor_file = monitor_file.replace('tests/log_server_test.py',
'monitor.py')
command = [
sys.executable, monitor_file, "--monitor_port",
str(monitor_port), "--address", "localhost:" + str(master_port)
]
if _IS_WINDOWS:
FNULL = tempfile.TemporaryFile()
else:
FNULL = open(os.devnull, 'w')
monitor_proc = subprocess.Popen(
command,
stdout=FNULL,
stderr=subprocess.STDOUT,
)
# Start worker
cluster_addr = 'localhost:{}'.format(master_port)
log_server_port = 8405
worker = Worker(cluster_addr, 4, log_server_port=log_server_port)
# Test monitor API
outputs = self._connect_and_create_actor(cluster_addr)
time.sleep(5) # Wait for the status update
client = get_global_client()
jobs_url = "{}/get-jobs?client_id={}".format(master.monitor_url,
client.client_id)
r = requests.get(jobs_url)
self.assertEqual(r.status_code, 200)
data = json.loads(r.text)
for job in data:
log_url = job.get('log_url')
self.assertIsNotNone(log_url)
r = requests.get(log_url)
self.assertEqual(r.status_code, 200)
log_content = json.loads(r.text).get('log')
self.assertIsNotNone(log_content)
self.assertIn(log_content, outputs)
# Test download
download_url = job.get('download_url')
r = requests.get(download_url)
self.assertEqual(r.status_code, 200)
self.assertIn(r.text, outputs)
# Clean context
monitor_proc.kill()
monitor_proc.wait()
disconnect()
worker.exit()
master.exit()
if __name__ == '__main__':
unittest.main()
...@@ -20,6 +20,7 @@ import signal ...@@ -20,6 +20,7 @@ import signal
import socket import socket
import subprocess import subprocess
import sys import sys
import tempfile
import time import time
import threading import threading
import warnings import warnings
...@@ -63,7 +64,7 @@ class Worker(object): ...@@ -63,7 +64,7 @@ class Worker(object):
cpu_num (int): Number of cpu to be used on the worker. cpu_num (int): Number of cpu to be used on the worker.
""" """
def __init__(self, master_address, cpu_num=None): def __init__(self, master_address, cpu_num=None, log_server_port=None):
self.lock = threading.Lock() self.lock = threading.Lock()
self.heartbeat_socket_initialized = threading.Event() self.heartbeat_socket_initialized = threading.Event()
self.ctx = zmq.Context.instance() self.ctx = zmq.Context.instance()
...@@ -75,6 +76,9 @@ class Worker(object): ...@@ -75,6 +76,9 @@ class Worker(object):
self._set_cpu_num(cpu_num) self._set_cpu_num(cpu_num)
self.job_buffer = queue.Queue(maxsize=self.cpu_num) self.job_buffer = queue.Queue(maxsize=self.cpu_num)
self._create_sockets() self._create_sockets()
# create log server
self.log_server_proc, self.log_server_address = self._create_log_server(
port=log_server_port)
# create a thread that waits commands from the job to kill the job. # create a thread that waits commands from the job to kill the job.
self.kill_job_thread = threading.Thread(target=self._reply_kill_job) self.kill_job_thread = threading.Thread(target=self._reply_kill_job)
...@@ -192,7 +196,8 @@ class Worker(object): ...@@ -192,7 +196,8 @@ class Worker(object):
job_file = job_file.replace('worker.py', 'job.py') job_file = job_file.replace('worker.py', 'job.py')
command = [ command = [
sys.executable, job_file, "--worker_address", sys.executable, job_file, "--worker_address",
self.reply_job_address self.reply_job_address, "--log_server_address",
self.log_server_address
] ]
if sys.version_info.major == 3: if sys.version_info.major == 3:
...@@ -354,10 +359,40 @@ class Worker(object): ...@@ -354,10 +359,40 @@ class Worker(object):
"[Worker] lost connection with the master, will exit reply heartbeat for master." "[Worker] lost connection with the master, will exit reply heartbeat for master."
) )
self.worker_status.clear() self.worker_status.clear()
self.log_server_proc.kill()
self.log_server_proc.wait()
# exit the worker # exit the worker
self.worker_is_alive = False self.worker_is_alive = False
self.exit() self.exit()
def _create_log_server(self, port):
log_server_file = __file__.replace('worker.pyc', 'log_server.py')
log_server_file = log_server_file.replace('worker.py', 'log_server.py')
if port is None:
port = "0" # `0` means using a random port in flask
command = [
sys.executable, log_server_file, "--port",
str(port), "--log_dir", "~/.parl_data/job/", "--line_num", "500"
]
if sys.version_info.major == 3:
warnings.simplefilter("ignore", ResourceWarning)
if _IS_WINDOWS:
FNULL = tempfile.TemporaryFile()
else:
FNULL = open(os.devnull, 'w')
log_server_proc = subprocess.Popen(
command,
stdout=FNULL,
stderr=subprocess.STDOUT,
)
FNULL.close()
log_server_address = "{}:{}".format(self.worker_ip, port)
return log_server_proc, log_server_address
def exit(self): def exit(self):
"""close the worker""" """close the worker"""
self.worker_is_alive = False self.worker_is_alive = False
......
...@@ -14,11 +14,15 @@ ...@@ -14,11 +14,15 @@
import os import os
import platform import platform
import random
import socket import socket
import subprocess import subprocess
from parl.utils import logger, _HAS_FLUID, _IS_WINDOWS from parl.utils import logger, _HAS_FLUID, _IS_WINDOWS
__all__ = ['get_gpu_count', 'get_ip_address', 'is_gpu_available'] __all__ = [
'get_gpu_count', 'get_ip_address', 'is_gpu_available', 'get_free_tcp_port',
'is_port_available', 'get_port_from_range'
]
def get_ip_address(): def get_ip_address():
...@@ -100,3 +104,32 @@ def is_gpu_available(): ...@@ -100,3 +104,32 @@ def is_gpu_available():
But PARL found that Paddle was not complied with CUDA, which may cause issues." But PARL found that Paddle was not complied with CUDA, which may cause issues."
) )
return ret return ret
def get_free_tcp_port():
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(('', 0))
addr, port = tcp.getsockname()
tcp.close()
return str(port)
def is_port_available(port):
""" Check if a port is used.
True if the port is available for connection.
"""
port = int(port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
available = sock.connect_ex(('localhost', port))
sock.close()
return available
def get_port_from_range(start, end):
while True:
port = random.randint(start, end)
if is_port_available(port):
break
return port
...@@ -80,6 +80,7 @@ setup( ...@@ -80,6 +80,7 @@ setup(
"flask>=1.0.4", "flask>=1.0.4",
"click", "click",
"psutil>=5.6.2", "psutil>=5.6.2",
"flask_cors",
"visualdl>=2.0.0b;python_version>='3' and platform_system=='Linux'", "visualdl>=2.0.0b;python_version>='3' and platform_system=='Linux'",
], ],
classifiers=[ classifiers=[
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册