提交 1eb4f7de 编写于 作者: M markus 提交者: StephanEwen

Information Webfrontend for JobManager

上级 4285102e
......@@ -75,6 +75,29 @@ public final class ConfigConstants {
*/
public static final String JOBCLIENT_SHUTDOWN_TERMINATEJOB_KEY = "jobclient.shutdown.terminatejob";
// ------------------------ Web Frontend JobManager------------------------
/**
* The key for Stratosphere's base dir path
*/
public static final String STRATOSPHERE_BASE_DIR_PATH_KEY = "stratosphere.base.dir.path";
/**
* The key for the config parameter defining port for the pact web-frontend server.
*/
public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
/**
* The key for the config parameter defining the directory containing the web documents.
*/
public static final String JOB_MANAGER_WEB_ROOT_PATH_KEY = "jobmanager.web.rootpath";
/**
* The key for the config parameter defining the port to the htaccess file protecting the web server.
*/
public static final String JOB_MANAGER_WEB_ACCESS_FILE_KEY = "jobmanager.web.access";
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------
......@@ -124,6 +147,15 @@ public final class ConfigConstants {
*/
public static final boolean DEFAULT_JOBCLIENT_SHUTDOWN_TERMINATEJOB = true;
/**
*/
public static final int DEFAULT_WEB_FRONTEND_PORT = 8081;
/**
* The default path of the directory for info server containing the web documents.
*/
public static final String DEFAULT_JOB_MANAGER_WEB_ROOT_PATH = "./resources/web-docs-infoserver/";
// ----------------------------- Instances --------------------------------
/**
......@@ -136,6 +168,7 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX = 1;
// ------------------------------------------------------------------------
/**
......
......@@ -19,9 +19,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.util.EnumUtils;
......@@ -417,4 +420,69 @@ public final class ManagementGroupVertex extends ManagementAttachment implements
public String toString() {
return String.format("ManagementGroupVertex(%s)", getName());
}
/**
* Returns Json representation of this ManagementGroupVertex
*
* @return
*/
public String toJson() {
StringBuilder json = new StringBuilder("");
json.append("{");
json.append("\"groupvertexid\": \"" + this.getID() + "\",");
json.append("\"groupvertexname\": \"" + this.getName() + "\",");
json.append("\"numberofgroupmembers\": " + this.getNumberOfGroupMembers() + ",");
json.append("\"groupmembers\": [");
// Count state status of group members
Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
// initialize with 0
for (ExecutionState state : ExecutionState.values()) {
stateCounts.put(state, new Integer(0));
}
for(int j = 0; j < this.getNumberOfGroupMembers(); j++) {
ManagementVertex vertex = this.getGroupMember(j);
json.append(vertex.toJson());
// print delimiter
if(j != this.getNumberOfGroupMembers() - 1) {
json.append(",");
}
// Increment state status count
Integer count = stateCounts.get(vertex.getExecutionState()) + new Integer(1);
stateCounts.put(vertex.getExecutionState(), count);
}
json.append("],");
json.append("\"backwardEdges\": [");
for(int edgeIndex = 0; edgeIndex < this.getNumberOfBackwardEdges(); edgeIndex++) {
ManagementGroupEdge edge = this.getBackwardEdge(edgeIndex);
json.append("{");
json.append("\"groupvertexid\": \"" + edge.getSource().getID() + "\",");
json.append("\"groupvertexname\": \"" + edge.getSource().getName() + "\",");
json.append("\"channelType\": \"" + edge.getChannelType() + "\"");
json.append("}");
// print delimiter
if(edgeIndex != this.getNumberOfBackwardEdges() - 1) {
json.append(",");
}
}
json.append("]");
// list number of members for each status
for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
json.append(",\""+stateCount.getKey()+"\": " + stateCount.getValue());
}
json.append("}");
return json.toString();
}
}
......@@ -331,4 +331,21 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
public String toString() {
return String.format("%s_%d", getGroupVertex().getName(), indexInGroup);
}
/**
* Returns Json representation of this ManagementVertex
*
* @return
*/
public String toJson() {
StringBuilder json = new StringBuilder("");
json.append("{");
json.append("\"vertexid\": \"" + this.getID() + "\",");
json.append("\"vertexname\": \"" + this + "\",");
json.append("\"vertexstatus\": \"" + this.getExecutionState() + "\",");
json.append("\"vertexinstancename\": \"" + this.getInstanceName() + "\",");
json.append("\"vertexinstancetype\": \"" + this.getInstanceType() + "\"");
json.append("}");
return json.toString();
}
}
......@@ -30,6 +30,24 @@
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>8.0.0.M1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<version>8.0.0.M1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>8.0.0.M1</version>
</dependency>
</dependencies>
<build>
......
*
{
padding: 0px;
margin: 0px;
}
html, body
{
height: 100%;
background-color: #E0EBFF;
}
h1
{
font-family: verdana;
font-size: 36px;
font-weight: normal;
font-decoration: none;
font-style: normal;
font-variant: small-caps;
}
h1 img {
vertical-align: middle;
margin-right: 50px;
}
h3
{
font-family: verdana;
font-size: 20px;
font-weight: normal;
font-decoration: none;
font-style: italic;
font-variant: normal;
text-align: center;
color: #666666;
margin: 10px 10px 20px 10px;
}
h4
{
font-family: Verdana;
font-size: 16px;
font-weight: normal;
font-decoration: none;
font-style: italic;
font-variant: normal;
text-align: left;
color: #000000;
margin: 5px 5px 10px 5px;
padding: 2px 2px 5px 2px;
border-bottom: 1px dashed #444444;
}
input
{
font-family: verdana, Sans-Serif;
font-size: 13px;
border: solid 1.5px #333333;
background-color: #eeeeee;
color: #333333;
padding: 5px;
margin: 5px;
}
.fadedPropertiesText
{
font-family: verdana;
font-size: 18px;
color: #AAAAAA;
font-style: italic;
}
.mainHeading
{
border: 1px solid #262A37;
margin: 5px;
text-align: left;
background-color: white;
background-image: url('../img/GradientBoxes.png');
background-repeat: no-repeat;
background-position: right;
height: 100px;
overflow: hidden;
}
.boxed
{
min-width: 900px;
max-width: 1150px;
margin: 0 auto;
border: 1px solid #262A37;
padding: 50px;
background-color: #ffffff;
-moz-box-shadow: 5px 5px 3px #888;
-webkit-box-shadow: 5px 5px 3px #888;
box-shadow: 5px 5px 3px #888;
}
.spaced
{
margin: 5px;
}
.footer
{
position: absolute;
left: 0px;
right: 0px;
bottom: 0px;
}
.error_text
{
font-family: verdana;
font-size: 14px;
font-weight: normal;
font-decoration: none;
font-style: italic;
font-variant: normal;
text-align: center;
color: #DF0101;
}
.translucent
{
-moz-opacity:0 ;
filter:alpha(opacity: 0);
opacity: 0;
}
.formLabel
{
font-family: verdana;
font-size: 14px;
font-weight: normal;
font-decoration: none;
font-style: normal;
font-variant: normal;
text-align: right;
color: #000000;
}
.jobListItem
{
border: 1px dashed #262A37;
height: 30px;
word-wrap: break-word;
font-family: verdana;
font-size: 14px;
font-variant: normal;
font-decoration: none;
margin: 10px;
padding: 10px;
}
.jobListItemName
{
text-align: left;
font-weight: bold;
font-style: normal;
}
.jobListItemDate
{
text-align: right;
font-weight: normal;
font-style: italic;
margin-right: 10px;
}
.layoutTable
{
border: none;
margin: 0px;
padding: 0px;
}
table{
border-spacing: 3px;
border-style: none;
width: 100%;
}
td, th {
color: #000000;
font-size: 12px;
line-height: 24px;
}
th {
background-color: #5D79F6 ;
border-style: none;
color: #DDDDDD;
font-size: 9pt;
font-weight: bold;
padding: 3px;
text-align: left;
}
td {
background-color: #CED8F6;
padding: 3px;
}
td.starting, td.running, td.finished, td.canceled, td.failed {
width: 120px;
}
td.starting {
background-color: #DADADA;
#DADADA
}
td.starting .progressBar div {
background-color: #A2A2A2;
}
td.running {
background-color: #CDDDAC;
}
td.running .progressBar div {
background-color: #9BBB59;
}
td.finished {
background-color: #CED7FC;
}
td.finished .progressBar div {
background-color: #5d79f6;
}
td.canceled {
background-color: #FDE0C8;
}
td.canceled .progressBar div {
background-color: #f79646;
}
td.failed {
background-color: #FAD3D3;
}
td.failed .progressBar div {
background-color: #e52424;
}
.subtable {
margin: 0 auto;
width: 800px;
background-color: #ffffff;
}
.subtable th {
background-color: #9EAFFA;
}
.progressBar {
width: 120px;
height: 22px;
}
.progressBar div {
height: 100%;
text-align: right;
line-height: 22px;
width: 0;
}
.opensub {
cursor: pointer
}
\ No newline at end of file
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html>
<head>
<title>Stratosphere JobManager</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<link rel="stylesheet" type="text/css" href="css/nephelefrontend.css">
<script type="text/javascript" src="js/jquery.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
<script type="text/javascript">
var jsonGlobal;
var widthProgressbar = 120;
(function poll() {
$.ajax({
url: "/jobsInfo",
type: "GET",
success: function(json) {
jsonGlobal = json
// Fill Table
$.each(json, function(i, job) {
var countGroups = 0;
var countTasks = 0;
var countStarting = 0;
var countRunning = 0;
var countFinished = 0;
var countCanceled = 0;
var countFailed = 0;
$("#jobs").html("");
$("#jobs").append("<h3>"+job.jobname+" (ID: "+job.jobid+")");
var jobtable;
jobtable = "<table id=\""+job.jobid+"\">\
<tr>\
<th>Name</th>\
<th>Tasks</th>\
<th>Starting</th>\
<th>Running</th>\
<th>Finished</th>\
<th>Canceled</th>\
<th>Failed</th>\
</tr>";
$.each(job.groupvertices, function(j, groupvertex) {
countGroups++;
countTasks += groupvertex.numberofgroupmembers;
countStarting += (groupvertex.CREATED+groupvertex.SCHEDULED+groupvertex.ASSIGNED+groupvertex.READY+groupvertex.STARTING);
countRunning += groupvertex.RUNNING;
countFinished += groupvertex.FINISHING+groupvertex.FINISHED;
countCanceled += groupvertex.CANCELING+groupvertex.CANCELED;
countFailed += groupvertex.FAILED;
jobtable += "<tr>\
<td id=\""+groupvertex.groupvertexid+"\"><span class=\"opensub\" open=\"_"+groupvertex.groupvertexid+"\">"+groupvertex.groupvertexname+"</span></td>\
<td>"+groupvertex.numberofgroupmembers+"</td>\
<td class=\"starting\"><div class=\"progressBar\"><div style=\"width:"+((groupvertex.CREATED+groupvertex.SCHEDULED+groupvertex.ASSIGNED+groupvertex.READY+groupvertex.STARTING)/groupvertex.numberofgroupmembers)*widthProgressbar+"px\">"+(groupvertex.CREATED+groupvertex.SCHEDULED+groupvertex.ASSIGNED+groupvertex.READY+groupvertex.STARTING)+"</div></div></td>\
<td class=\"running\"><div class=\"progressBar\"><div style=\"width:"+(groupvertex.RUNNING / groupvertex.numberofgroupmembers) *widthProgressbar+"px\">"+groupvertex.RUNNING+"</div></div></td>\
<td class=\"finished\"><div class=\"progressBar\"><div style=\"width:"+((groupvertex.FINISHING+groupvertex.FINISHED)/groupvertex.numberofgroupmembers)*widthProgressbar+"px\">"+(groupvertex.FINISHING+groupvertex.FINISHED)+"</div></div></td>\
<td class=\"canceled\"><div class=\"progressBar\"><div style=\"width:"+((groupvertex.CANCELING+groupvertex.CANCELED)/groupvertex.numberofgroupmembers)*widthProgressbar+"px\">"+(groupvertex.CANCELING+groupvertex.CANCELED)+"</div></div></td>\
<td class=\"failed\"><div class=\"progressBar\"><div style=\"width:"+(groupvertex.FAILED / groupvertex.numberofgroupmembers) *widthProgressbar+"px\">"+groupvertex.FAILED+"</div></div></td>\
</tr>\
<td colspan=8 id=\"_"+groupvertex.groupvertexid+"\" style=\"display:none\">\
<table class=\"subtable\">\
<tr>\
<th>Name</th>\
<th>status</th>\
<th>instancename</th>\
<th>instancetype</th>\
</tr>";
$.each(groupvertex.groupmembers, function(k, vertex) {
jobtable += "<tr>\
<td>"+vertex.vertexname+"</td>\
<td>"+vertex.vertexstatus+"</td>\
<td>"+vertex.vertexinstancename+"</td>\
<td>"+vertex.vertexinstancetype+"</td>\
</tr>";
});
jobtable += "</table></td></tr>";
});
jobtable += "<tr>\
<td colspan=\"2\" align=\"center\">Sum</td>\
<td>"+countTasks+"</td>\
<td class=\"starting\"><div class=\"progressBar\"><div style=\"width:"+(countStarting/countTasks)*widthProgressbar+"px\">"+countStarting+"</div></div></td>\
<td class=\"running\"><div class=\"progressBar\"><div style=\"width:"+(countRunning / countTasks) *widthProgressbar+"px\">"+countRunning+"</div></div></td>\
<td class=\"finished\"><div class=\"progressBar\"><div style=\"width:"+(countFinished/countTasks)*widthProgressbar+"px\">"+countFinished+"</div></div></td>\
<td class=\"canceled\"><div class=\"progressBar\"><div style=\"width:"+(countCanceled/countTasks)*widthProgressbar+"px\">"+countCanceled+"</div></div></td>\
<td class=\"failed\"><div class=\"progressBar\"><div style=\"width:"+(countFailed / countTasks) *widthProgressbar+"px\">"+countFailed+"</div></div></td>\
</tr>";
jobtable += "</table>"
$("#jobs").append(jobtable);
$("#"+job.jobid).prepend("<tr><td width=\"100\" rowspan="+(countGroups*2+2)+"><canvas id=\"dependencies"+job.jobid+"\" height=\""+($("#"+job.jobid).height())+"\" width=\"100\"></canvas></td>");
});
drawDependencies(json);
},
dataType: "json",
//complete: setTimeout(function() {poll()}, 5000),
//timeout: 2000
});
})();
/*
* Toggle ExecutionVectors
*/
$(".opensub").live("click", function() {
var id = $(this).attr("open");
$("#"+id).toggle();
drawDependencies();
});
/*
* Draw graph on left side beside table
*/
function drawDependencies() {
$("#dependencies")
$.each(jsonGlobal, function(i, job) {
$("#dependencies"+job.jobid).clearCanvas();
$("#dependencies"+job.jobid).attr("height", 10);
$("#dependencies"+job.jobid).attr("height", ($("#"+job.jobid).height()));
$.each(job.groupvertices, function(j, groupvertex) {
edgeCount = 0;
$.each(groupvertex.backwardEdges, function(k, edge) {
var y1 = ($("#"+edge.groupvertexid).offset().top - $("#dependencies"+job.jobid).offset().top)+15;
var y2 = ($("#"+groupvertex.groupvertexid).offset().top - $("#dependencies"+job.jobid).offset().top)+15;
var cy1 = y1 + (y2 - y1) / 2;
//var cx1 = 70 - (edgeCount / groupvertex.numberofgroupmembers) * 140;
var cx1 = 0;
edgeCount++;
var strokeWidth = 2;
if(edge.channelType == "NETWORK")
var strokeWidth = 4;
$("#dependencies"+job.jobid).drawQuadratic({
strokeStyle: "#5D79F6",
strokeWidth: strokeWidth,
x1: 100, y1: y1, // Start point
cx1: cx1, cy1: cy1, // Control point
x2: 100, y2: y2 // End point
});
});
});
});
}
</script>
</head>
<body>
<div class="mainHeading" style="min-width: 950x;">
<h1><img src="img/StratosphereLogo.png" width="326" height="100" alt="Stratosphere Logo" />Stratosphere JobManager</h1>
</div>
<a href="/logInfo" target="_blank" id="viewlog">View Log</a> | <a href="/logInfo?get=stdout" target="_blank" id="viewStdout">View Stdout</a>
<!-- the main panel with the jobs list and the preview pane -->
<div class="boxed">
<h2>Running Jobs</h2>
<div id="jobs">
</div>
</div>
<div id="lightbox" style="display: none">
<p>Click to close</p>
<div id="lightboxcontent">
</div>
</div>
</body>
</html>
......@@ -64,6 +64,7 @@ import eu.stratosphere.nephele.client.JobCancelResult;
import eu.stratosphere.nephele.client.JobProgressResult;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
import eu.stratosphere.nephele.discovery.DiscoveryException;
......@@ -99,6 +100,7 @@ import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
import eu.stratosphere.nephele.jobmanager.web.WebInfoServer;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.multicast.MulticastManager;
......@@ -166,6 +168,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private volatile boolean isShutDown = false;
private WebInfoServer server;
public JobManager(ExecutionMode executionMode) {
final String ipcAddressString = GlobalConfiguration
......@@ -387,6 +391,13 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// Create a new job manager object
JobManager jobManager = new JobManager(executionMode);
// Set base dir for info server
Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
infoserverConfig.setString(ConfigConstants.STRATOSPHERE_BASE_DIR_PATH_KEY, configDir+"/..");
// Start info server for jobmanager
jobManager.startInfoServer(infoserverConfig);
// Run the main task loop
jobManager.runTaskLoop();
......@@ -1166,4 +1177,21 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertex, sequenceNumber.getValue()));
}
/**
* Starts the Jetty Infoserver for the Jobmanager
*
* @param config
*/
public void startInfoServer(Configuration config) {
// Start InfoServer
try {
int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_WEB_FRONTEND_PORT);
server = new WebInfoServer(config, port, this);
server.start();
} catch (Exception e) {
LOG.fatal("Cannot instantiate info server: " + StringUtils.stringifyException(e));
System.exit(FAILURERETURNCODE);
}
}
}
package eu.stratosphere.nephele.jobmanager.web;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.io.EofException;
import eu.stratosphere.nephele.event.job.RecentJobEvent;
import eu.stratosphere.nephele.jobmanager.JobManager;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementGroupVertex;
import eu.stratosphere.nephele.util.StringUtils;
public class JobmanagerInfoServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* The log for this class.
*/
private static final Log LOG = LogFactory.getLog(JobmanagerInfoServlet.class);
/**
* Underlying JobManager
*/
private final JobManager jobmanager;
public JobmanagerInfoServlet(JobManager jobmanager) {
this.jobmanager = jobmanager;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
List<RecentJobEvent> jobs = jobmanager.getRecentJobs();
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("application/json");
PrintWriter wrt = resp.getWriter();
wrt.write("[");
// Loop Jobs
for (int i = 0; i < jobs.size(); i++) {
RecentJobEvent jobEvent = jobs.get(i);
ManagementGraph jobManagementGraph = jobmanager.getManagementGraph(jobEvent.getJobID());
//Serialize job to json
wrt.write("{");
wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
wrt.write("\"time\": " + jobEvent.getTimestamp()+",");
// Serialize ManagementGraph to json
wrt.write("\"groupvertices\": [");
boolean first = true;
for(ManagementGroupVertex groupVertex : jobManagementGraph.getGroupVerticesInTopologicalOrder()) {
//Write seperator between json objects
if(first) {
first = false;
} else {
wrt.write(","); }
wrt.write(groupVertex.toJson());
}
wrt.write("]");
wrt.write("}");
//Write seperator between json objects
if(i != jobs.size() - 1) {
wrt.write(",");
}
}
wrt.write("]");
} catch (EofException eof) { // Connection closed by client
LOG.info("Info server for jobmanager: Connection closed by client, EofException");
} catch (IOException ioe) { // Connection closed by client
LOG.info("Info server for jobmanager: Connection closed by client, IOException");
} catch (Exception e) {
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
resp.getWriter().print(e.getMessage());
if (LOG.isWarnEnabled()) {
LOG.warn(StringUtils.stringifyException(e));
}
}
}
}
package eu.stratosphere.nephele.jobmanager.web;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.util.StringUtils;
public class LogfileInfoServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* The log for this class.
*/
private static final Log LOG = LogFactory.getLog(LogfileInfoServlet.class);
private File logDir;
public LogfileInfoServlet(File logDir) {
this.logDir = logDir;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
if("stdout".equals(req.getParameter("get"))) {
// Find current stdtout file
for(File f : logDir.listFiles()) {
// contains "jobmanager" ".log" and no number in the end ->needs improvement
if(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".out") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) {
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain ");
writeFile(resp.getOutputStream(), f);
break;
}
}
}
else {
// Find current logfile
for(File f : logDir.listFiles()) {
// contains "jobmanager" ".log" and no number in the end ->needs improvement
if(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) {
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain ");
writeFile(resp.getOutputStream(), f);
break;
}
}
}
} catch (Throwable t) {
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
resp.getWriter().print(t.getMessage());
if (LOG.isWarnEnabled()) {
LOG.warn(StringUtils.stringifyException(t));
}
}
}
private static void writeFile(OutputStream out, File file) throws IOException {
BufferedInputStream is = new BufferedInputStream(new FileInputStream(file));
byte[] buf = new byte[4 * 1024]; // 4K buffer
int bytesRead;
while ((bytesRead = is.read(buf)) != -1) {
out.write(buf, 0, bytesRead);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.jobmanager.web;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.http.security.Constraint;
import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler;
import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.security.authentication.BasicAuthenticator;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.jobmanager.JobManager;
/**
* This class sets up a web-server that contains a web frontend to display information about running jobs.
* It instantiates and configures an embedded jetty server.
*/
public class WebInfoServer {
/**
* The log for this class.
*/
private static final Log LOG = LogFactory.getLog(WebInfoServer.class);
/**
* The jetty server serving all requests.
*/
private final Server server;
/**
* Port for info server
*/
private int port;
/**
* Underlying JobManager
*/
private final JobManager jobmanager;
/**
* Creates a new web info server. The server runs the servlets that implement the logic
* to list all present information concerning the job manager
*
* @param nepheleConfig
* The configuration for the nephele job manager.
* @param port
* The port to launch the server on.
* @throws IOException
* Thrown, if the server setup failed for an I/O related reason.
*/
public WebInfoServer(Configuration nepheleConfig, int port, JobManager jobmanager)
throws IOException {
Configuration config = GlobalConfiguration.getConfiguration();
// if no explicit configuration is given, use the global configuration
if (nepheleConfig == null) {
nepheleConfig = config;
}
this.jobmanager = jobmanager;
// get base path of Stratosphere installation
String basePath = nepheleConfig.getString(ConfigConstants.STRATOSPHERE_BASE_DIR_PATH_KEY,"");
File webDir;
String webDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ROOT_PATH);
if(webDirPath.startsWith("/")) {
// absolute path
webDir = new File(webDirPath);
} else {
// path relative to base dir
webDir = new File(basePath+"/"+webDirPath);
}
if (LOG.isInfoEnabled()) {
LOG.info("Setting up web info server, using web-root directory '" + webDir.getAbsolutePath() + "'.");
//LOG.info("Web info server will store temporary files in '" + tmpDir.getAbsolutePath());
LOG.info("Web info server will display information about nephele job-manager on "
+ nepheleConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port "
+ port
+ ".");
}
this.port = port;
server = new Server(port);
// ensure that the directory with the web documents exists
if (!webDir.exists()) {
throw new FileNotFoundException("The directory containing the web documents does not exist: "
+ webDir.getAbsolutePath());
}
// ----- the handlers for the servlets -----
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.setContextPath("/");
servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager)), "/jobsInfo");
servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(new File(basePath+"/log"))), "/logInfo");
// ----- the handler serving all the static files -----
ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setDirectoriesListed(false);
resourceHandler.setResourceBase(webDir.getAbsolutePath());
// ----- add the handlers to the list handler -----
HandlerList handlers = new HandlerList();
handlers.addHandler(servletContext);
handlers.addHandler(resourceHandler);
// ----- create the login module with http authentication -----
File af = null;
String authFile = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_FILE_KEY, null);
if (authFile != null) {
af = new File(authFile);
if (!af.exists()) {
LOG.error("The specified file '" + af.getAbsolutePath()
+ "' with the authentication information is missing. Starting server without HTTP authentication.");
af = null;
}
}
if (af != null) {
HashLoginService loginService = new HashLoginService("Stratosphere Jobmanager Interface", authFile);
server.addBean(loginService);
Constraint constraint = new Constraint();
constraint.setName(Constraint.__BASIC_AUTH);
constraint.setAuthenticate(true);
constraint.setRoles(new String[] { "user" });
ConstraintMapping mapping = new ConstraintMapping();
mapping.setPathSpec("/*");
mapping.setConstraint(constraint);
ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
sh.addConstraintMapping(mapping);
sh.setAuthenticator(new BasicAuthenticator());
sh.setLoginService(loginService);
sh.setStrict(true);
// set the handers: the server hands the request to the security handler,
// which hands the request to the other handlers when authenticated
sh.setHandler(handlers);
server.setHandler(sh);
} else {
server.setHandler(handlers);
}
}
/**
* Starts the web frontend server.
*
* @throws Exception
* Thrown, if the start fails.
*/
public void start() throws Exception {
LOG.info("Starting web info server for JobManager on port " + this.port);
server.start();
}
}
......@@ -48,24 +48,18 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>8.0.0.M1</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<version>8.0.0.M1</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>8.0.0.M1</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
......
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -36,7 +36,7 @@
The previous command did not work because it also excludes libraries that should be in lib, such as commons-io.
-->
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.eclipse.jetty:jetty-*</exclude>
<!-- <exclude>org.eclipse.jetty:jetty-*</exclude>-->
</excludes>
</dependencySet>
......@@ -129,6 +129,16 @@
</excludes>
</fileSet>
<fileSet>
<!-- copy files for Jobmanager web frontend -->
<directory>../nephele/nephele-server/resources</directory>
<outputDirectory>resources</outputDirectory>
<fileMode>0644</fileMode>
<excludes>
<exclude>*etc/users</exclude>
</excludes>
</fileSet>
<fileSet>
<!-- copy jar files of pact job examples -->
<directory>../pact/pact-examples/target</directory>
......
......@@ -29,6 +29,8 @@ jobmanager.rpc.numhandler: 8
# JVM heap size in MB
jobmanager.heap.mb: 256
jobmanager.web.port: 8081
#=======================================================================================================================
# TASK MANAGER (WORKERs)
#=======================================================================================================================
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册