提交 489700c8 编写于 作者: Z zsxwing 提交者: Tathagata Das

[SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics

This is the initial work of SPARK-6939. Not yet ready for code review. Here are the screenshots:

![graph1](https://cloud.githubusercontent.com/assets/1000778/7165766/465942e0-e3dc-11e4-9b05-c184b09d75dc.png)

![graph2](https://cloud.githubusercontent.com/assets/1000778/7165779/53f13f34-e3dc-11e4-8714-a4a75b7e09ff.png)

TODOs:
- [x] Display more information on mouse hover
- [x] Align the timeline and distribution graphs
- [x] Clean up the codes

Author: zsxwing <zsxwing@gmail.com>

Closes #5533 from zsxwing/SPARK-6939 and squashes the following commits:

9f7cd19 [zsxwing] Merge branch 'master' into SPARK-6939
deacc3f [zsxwing] Remove unused import
cd03424 [zsxwing] Fix .rat-excludes
70cc87d [zsxwing] Streaming Scheduling Delay => Scheduling Delay
d457277 [zsxwing] Fix UIUtils in BatchPage
b3f303e [zsxwing] Add comments for unclear classes and methods
ff0bff8 [zsxwing] Make InputDStream.name private[streaming]
cc392c5 [zsxwing] Merge branch 'master' into SPARK-6939
e275e23 [zsxwing] Move time related methods to Streaming's UIUtils
d5d86f6 [zsxwing] Fix incorrect lastErrorTime
3be4b7a [zsxwing] Use InputInfo
b50fa32 [zsxwing] Jump to the batch page when clicking a point in the timeline graphs
203605d [zsxwing] Merge branch 'master' into SPARK-6939
74307cf [zsxwing] Reuse the data for histogram graphs to reduce the page size
2586916 [zsxwing] Merge branch 'master' into SPARK-6939
70d8533 [zsxwing] Remove BatchInfo.numRecords and a few renames
7bbdc0a [zsxwing] Hide the receiver sub table if no receiver
a2972e9 [zsxwing] Add some ui tests for StreamingPage
fd03ad0 [zsxwing] Add a test to verify no memory leak
4a8f886 [zsxwing] Merge branch 'master' into SPARK-6939
18607a1 [zsxwing] Merge branch 'master' into SPARK-6939
d0b0aec [zsxwing] Clean up the codes
a459f49 [zsxwing] Add a dash line to processing time graphs
8e4363c [zsxwing] Prepare for the demo
c81a1ee [zsxwing] Change time unit in the graphs automatically
4c0b43f [zsxwing] Update Streaming UI
04c7500 [zsxwing] Make the server and client use the same timezone
fed8219 [zsxwing] Move the x axis at the top and show a better tooltip
c23ce10 [zsxwing] Make two graphs close
d78672a [zsxwing] Make the X axis use the same range
881c907 [zsxwing] Use histogram for distribution
5688702 [zsxwing] Fix the unit test
ddf741a [zsxwing] Fix the unit test
ad93295 [zsxwing] Remove unnecessary codes
a0458f9 [zsxwing] Clean the codes
b82ed1e [zsxwing] Update the graphs as per comments
dd653a1 [zsxwing] Add timeline and histogram graphs for streaming statistics
上级 47728db7
......@@ -643,6 +643,36 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
========================================================================
For d3 (core/src/main/resources/org/apache/spark/ui/static/d3.min.js):
========================================================================
Copyright (c) 2010-2015, Michael Bostock
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* The name Michael Bostock may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
......
/* ===========================================================
* bootstrap-tooltip.js v2.2.2
* http://twitter.github.com/bootstrap/javascript.html#tooltips
* bootstrap-tooltip.js v2.3.2
* http://getbootstrap.com/2.3.2/javascript.html#tooltips
* Inspired by the original jQuery.tipsy by Jason Frame
* ===========================================================
* Copyright 2012 Twitter, Inc.
* Copyright 2013 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -38,19 +38,27 @@
, init: function (type, element, options) {
var eventIn
, eventOut
, triggers
, trigger
, i
this.type = type
this.$element = $(element)
this.options = this.getOptions(options)
this.enabled = true
if (this.options.trigger == 'click') {
this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this))
} else if (this.options.trigger != 'manual') {
eventIn = this.options.trigger == 'hover' ? 'mouseenter' : 'focus'
eventOut = this.options.trigger == 'hover' ? 'mouseleave' : 'blur'
this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this))
this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this))
triggers = this.options.trigger.split(' ')
for (i = triggers.length; i--;) {
trigger = triggers[i]
if (trigger == 'click') {
this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this))
} else if (trigger != 'manual') {
eventIn = trigger == 'hover' ? 'mouseenter' : 'focus'
eventOut = trigger == 'hover' ? 'mouseleave' : 'blur'
this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this))
this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this))
}
}
this.options.selector ?
......@@ -59,7 +67,7 @@
}
, getOptions: function (options) {
options = $.extend({}, $.fn[this.type].defaults, options, this.$element.data())
options = $.extend({}, $.fn[this.type].defaults, this.$element.data(), options)
if (options.delay && typeof options.delay == 'number') {
options.delay = {
......@@ -72,7 +80,15 @@
}
, enter: function (e) {
var self = $(e.currentTarget)[this.type](this._options).data(this.type)
var defaults = $.fn[this.type].defaults
, options = {}
, self
this._options && $.each(this._options, function (key, value) {
if (defaults[key] != value) options[key] = value
}, this)
self = $(e.currentTarget)[this.type](options).data(this.type)
if (!self.options.delay || !self.options.delay.show) return self.show()
......@@ -97,14 +113,16 @@
, show: function () {
var $tip
, inside
, pos
, actualWidth
, actualHeight
, placement
, tp
, e = $.Event('show')
if (this.hasContent() && this.enabled) {
this.$element.trigger(e)
if (e.isDefaultPrevented()) return
$tip = this.tip()
this.setContent()
......@@ -116,19 +134,18 @@
this.options.placement.call(this, $tip[0], this.$element[0]) :
this.options.placement
inside = /in/.test(placement)
$tip
.detach()
.css({ top: 0, left: 0, display: 'block' })
.insertAfter(this.$element)
pos = this.getPosition(inside)
this.options.container ? $tip.appendTo(this.options.container) : $tip.insertAfter(this.$element)
pos = this.getPosition()
actualWidth = $tip[0].offsetWidth
actualHeight = $tip[0].offsetHeight
switch (inside ? placement.split(' ')[1] : placement) {
switch (placement) {
case 'bottom':
tp = {top: pos.top + pos.height, left: pos.left + pos.width / 2 - actualWidth / 2}
break
......@@ -143,11 +160,56 @@
break
}
$tip
.offset(tp)
.addClass(placement)
.addClass('in')
this.applyPlacement(tp, placement)
this.$element.trigger('shown')
}
}
, applyPlacement: function(offset, placement){
var $tip = this.tip()
, width = $tip[0].offsetWidth
, height = $tip[0].offsetHeight
, actualWidth
, actualHeight
, delta
, replace
$tip
.offset(offset)
.addClass(placement)
.addClass('in')
actualWidth = $tip[0].offsetWidth
actualHeight = $tip[0].offsetHeight
if (placement == 'top' && actualHeight != height) {
offset.top = offset.top + height - actualHeight
replace = true
}
if (placement == 'bottom' || placement == 'top') {
delta = 0
if (offset.left < 0){
delta = offset.left * -2
offset.left = 0
$tip.offset(offset)
actualWidth = $tip[0].offsetWidth
actualHeight = $tip[0].offsetHeight
}
this.replaceArrow(delta - width + actualWidth, actualWidth, 'left')
} else {
this.replaceArrow(actualHeight - height, actualHeight, 'top')
}
if (replace) $tip.offset(offset)
}
, replaceArrow: function(delta, dimension, position){
this
.arrow()
.css(position, delta ? (50 * (1 - delta / dimension) + "%") : '')
}
, setContent: function () {
......@@ -161,6 +223,10 @@
, hide: function () {
var that = this
, $tip = this.tip()
, e = $.Event('hide')
this.$element.trigger(e)
if (e.isDefaultPrevented()) return
$tip.removeClass('in')
......@@ -179,6 +245,8 @@
removeWithAnimation() :
$tip.detach()
this.$element.trigger('hidden')
return this
}
......@@ -193,11 +261,12 @@
return this.getTitle()
}
, getPosition: function (inside) {
return $.extend({}, (inside ? {top: 0, left: 0} : this.$element.offset()), {
width: this.$element[0].offsetWidth
, height: this.$element[0].offsetHeight
})
, getPosition: function () {
var el = this.$element[0]
return $.extend({}, (typeof el.getBoundingClientRect == 'function') ? el.getBoundingClientRect() : {
width: el.offsetWidth
, height: el.offsetHeight
}, this.$element.offset())
}
, getTitle: function () {
......@@ -215,6 +284,10 @@
return this.$tip = this.$tip || $(this.options.template)
}
, arrow: function(){
return this.$arrow = this.$arrow || this.tip().find(".tooltip-arrow")
}
, validate: function () {
if (!this.$element[0].parentNode) {
this.hide()
......@@ -236,8 +309,8 @@
}
, toggle: function (e) {
var self = $(e.currentTarget)[this.type](this._options).data(this.type)
self[self.tip().hasClass('in') ? 'hide' : 'show']()
var self = e ? $(e.currentTarget)[this.type](this._options).data(this.type) : this
self.tip().hasClass('in') ? self.hide() : self.show()
}
, destroy: function () {
......@@ -269,10 +342,11 @@
, placement: 'top'
, selector: false
, template: '<div class="tooltip"><div class="tooltip-arrow"></div><div class="tooltip-inner"></div></div>'
, trigger: 'hover'
, trigger: 'hover focus'
, title: ''
, delay: 0
, html: false
, container: false
}
......@@ -285,4 +359,3 @@
}
}(window.jQuery);
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.graph {
font: 10px sans-serif;
}
.axis path, .axis line {
fill: none;
stroke: gray;
shape-rendering: crispEdges;
}
.axis text {
fill: gray;
}
.tooltip-inner {
max-width: 500px !important; // Make sure we only have one line tooltip
}
.line {
fill: none;
stroke: #0088cc;
stroke-width: 1.5px;
}
.bar rect {
fill: #0088cc;
shape-rendering: crispEdges;
}
.bar rect:hover {
fill: #00c2ff;
}
.timeline {
width: 500px;
}
.histogram {
width: auto;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// timeFormat: StreamingPage.scala will generate a global "timeFormat" dictionary to store the time
// and its formatted string. Because we cannot specify a timezone in JavaScript, to make sure the
// server and client use the same timezone, we use the "timeFormat" dictionary to format all time
// values used in the graphs.
// A global margin left for all timeline graphs. It will be set in "registerTimeline". This will be
// used to align all timeline graphs.
var maxMarginLeftForTimeline = 0;
// The max X values for all histograms. It will be set in "registerHistogram".
var maxXForHistogram = 0;
var histogramBinCount = 10;
var yValueFormat = d3.format(",.2f");
// Show a tooltip "text" for "node"
function showBootstrapTooltip(node, text) {
$(node).tooltip({title: text, trigger: "manual", container: "body"});
$(node).tooltip("show");
}
// Hide the tooltip for "node"
function hideBootstrapTooltip(node) {
$(node).tooltip("destroy");
}
// Register a timeline graph. All timeline graphs should be register before calling any
// "drawTimeline" so that we can determine the max margin left for all timeline graphs.
function registerTimeline(minY, maxY) {
var numOfChars = yValueFormat(maxY).length;
// A least width for "maxY" in the graph
var pxForMaxY = numOfChars * 8 + 10;
// Make sure we have enough space to show the ticks in the y axis of timeline
maxMarginLeftForTimeline = pxForMaxY > maxMarginLeftForTimeline? pxForMaxY : maxMarginLeftForTimeline;
}
// Register a histogram graph. All histogram graphs should be register before calling any
// "drawHistogram" so that we can determine the max X value for histograms.
function registerHistogram(values, minY, maxY) {
var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
// d.x is the y values while d.y is the x values
var maxX = d3.max(data, function(d) { return d.y; });
maxXForHistogram = maxX > maxXForHistogram ? maxX : maxXForHistogram;
}
// Draw a line between (x1, y1) and (x2, y2)
function drawLine(svg, xFunc, yFunc, x1, y1, x2, y2) {
var line = d3.svg.line()
.x(function(d) { return xFunc(d.x); })
.y(function(d) { return yFunc(d.y); });
var data = [{x: x1, y: y1}, {x: x2, y: y2}];
svg.append("path")
.datum(data)
.style("stroke-dasharray", ("6, 6"))
.style("stroke", "lightblue")
.attr("class", "line")
.attr("d", line);
}
/**
* @param id the `id` used in the html `div` tag
* @param data the data for the timeline graph
* @param minX the min value of X axis
* @param maxX the max value of X axis
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
* @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
*/
function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
// Hide the right border of "<td>". We cannot use "css" directly, or "sorttable.js" will override them.
d3.select(d3.select(id).node().parentNode)
.style("padding", "8px 0 8px 8px")
.style("border-right", "0px solid white");
var margin = {top: 20, right: 27, bottom: 30, left: maxMarginLeftForTimeline};
var width = 500 - margin.left - margin.right;
var height = 150 - margin.top - margin.bottom;
var x = d3.scale.linear().domain([minX, maxX]).range([0, width]);
var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; });
var formatYValue = d3.format(",.2f");
var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue);
var line = d3.svg.line()
.x(function(d) { return x(d.x); })
.y(function(d) { return y(d.y); });
var svg = d3.select(id).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
// Only show the first and last time in the graph
xAxis.tickValues(x.domain());
svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis)
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
.append("text")
.attr("transform", "translate(0," + (-3) + ")")
.text(unitY);
if (batchInterval && batchInterval <= maxY) {
drawLine(svg, x, y, minX, batchInterval, maxX, batchInterval);
}
svg.append("path")
.datum(data)
.attr("class", "line")
.attr("d", line);
// Add points to the line. However, we make it invisible at first. But when the user moves mouse
// over a point, it will be displayed with its detail.
svg.selectAll(".point")
.data(data)
.enter().append("circle")
.attr("stroke", "white") // white and opacity = 0 make it invisible
.attr("fill", "white")
.attr("opacity", "0")
.attr("cx", function(d) { return x(d.x); })
.attr("cy", function(d) { return y(d.y); })
.attr("r", function(d) { return 3; })
.on('mouseover', function(d) {
var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
showBootstrapTooltip(d3.select(this).node(), tip);
// show the point
d3.select(this)
.attr("stroke", "steelblue")
.attr("fill", "steelblue")
.attr("opacity", "1");
})
.on('mouseout', function() {
hideBootstrapTooltip(d3.select(this).node());
// hide the point
d3.select(this)
.attr("stroke", "white")
.attr("fill", "white")
.attr("opacity", "0");
})
.on("click", function(d) {
window.location.href = "batch/?id=" + d.x;
});
}
/**
* @param id the `id` used in the html `div` tag
* @param values the data for the histogram graph
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
* @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
*/
function drawHistogram(id, values, minY, maxY, unitY, batchInterval) {
// Hide the left border of "<td>". We cannot use "css" directly, or "sorttable.js" will override them.
d3.select(d3.select(id).node().parentNode)
.style("padding", "8px 8px 8px 0")
.style("border-left", "0px solid white");
var margin = {top: 20, right: 30, bottom: 30, left: 10};
var width = 300 - margin.left - margin.right;
var height = 150 - margin.top - margin.bottom;
var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]);
var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
var yAxis = d3.svg.axis().scale(y).orient("left").ticks(0).tickFormat(function(d) { return ""; });
var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
var svg = d3.select(id).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
if (batchInterval && batchInterval <= maxY) {
drawLine(svg, x, y, 0, batchInterval, maxXForHistogram, batchInterval);
}
svg.append("g")
.attr("class", "x axis")
.call(xAxis)
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
var bar = svg.selectAll(".bar")
.data(data)
.enter()
.append("g")
.attr("transform", function(d) { return "translate(0," + (y(d.x) - height + y(d.dx)) + ")";})
.attr("class", "bar").append("rect")
.attr("width", function(d) { return x(d.y); })
.attr("height", function(d) { return height - y(d.dx); })
.on('mouseover', function(d) {
var percent = yValueFormat(d.y * 100.0 / values.length) + "%";
var tip = d.y + " batches (" + percent + ") between " + yValueFormat(d.x) + " and " + yValueFormat(d.x + d.dx) + " " + unitY;
showBootstrapTooltip(d3.select(this).node(), tip);
})
.on('mouseout', function() {
hideBootstrapTooltip(d3.select(this).node());
});
if (batchInterval && batchInterval <= maxY) {
// Add the "stable" text to the graph below the batch interval line.
var stableXOffset = x(maxXForHistogram) - 20;
var stableYOffset = y(batchInterval) + 15;
svg.append("text")
.style("fill", "lightblue")
.attr("class", "stable-text")
.attr("text-anchor", "middle")
.attr("transform", "translate(" + stableXOffset + "," + stableYOffset + ")")
.text("stable")
.on('mouseover', function(d) {
var tip = "Processing Time <= Batch Interval (" + yValueFormat(batchInterval) +" " + unitY +")";
showBootstrapTooltip(d3.select(this).node(), tip);
})
.on('mouseout', function() {
hideBootstrapTooltip(d3.select(this).node());
});
}
}
$(function() {
function getParameterFromURL(param)
{
var parameters = window.location.search.substring(1); // Remove "?"
var keyValues = parameters.split('&');
for (var i = 0; i < keyValues.length; i++)
{
var paramKeyValue = keyValues[i].split('=');
if (paramKeyValue[0] == param)
{
return paramKeyValue[1];
}
}
}
if (getParameterFromURL("show-streams-detail") == "true") {
// Show the details for all InputDStream
$('#inputs-table').toggle('collapsed');
$('#triangle').html('&#9660;');
}
});
......@@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}
def getInputStreamName(streamId: Int): Option[String] = synchronized {
inputStreams.find(_.id == streamId).map(_.name)
}
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
......
......@@ -44,6 +44,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()
/**
* The name of this InputDStream. By default, it's the class name with its id.
*/
private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
......
......@@ -32,6 +32,7 @@ case class ReceiverInfo(
active: Boolean,
location: String,
lastErrorMessage: String = "",
lastError: String = ""
lastError: String = "",
lastErrorTime: Long = -1L
) {
}
......@@ -155,10 +155,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error)
val lastErrorTime =
if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message,
lastError = error, lastErrorTime = lastErrorTime)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
val lastErrorTime =
if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
lastError = error, lastErrorTime = lastErrorTime)
}
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
......@@ -182,7 +188,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
}
receiverInfo(streamId) = newReceiverInfo
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
......
......@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
import scala.xml.Node
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
private[ui] abstract class BatchTableBase(tableId: String) {
......@@ -32,12 +32,12 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds)
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
<td sorttable_customkey={batchTime.toString}>
<a href={s"batch?id=$batchTime"}>
......@@ -107,7 +107,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
......
......@@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.streaming.Time
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
......@@ -73,8 +73,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-")
val detailUrl = s"${SparkUIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
......@@ -110,7 +110,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</td>
<td class="progress-cell">
{
UIUtils.makeProgressBar(
SparkUIUtils.makeProgressBar(
started = sparkJob.numActiveTasks,
completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks,
......@@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
......@@ -212,24 +212,24 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
}
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds)
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val formattedSchedulingDelay =
batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
batchUIData.schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime =
batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<strong>Batch Duration: </strong>
{UIUtils.formatDuration(streamingListener.batchDuration)}
{SparkUIUtils.formatDuration(streamingListener.batchDuration)}
</li>
<li>
<strong>Input data size: </strong>
......@@ -259,6 +259,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val content = summary ++ jobTable
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
}
......@@ -26,7 +26,7 @@ private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobI
private[ui] case class BatchUIData(
val batchTime: Time,
val receiverNumRecords: Map[Int, Long],
val streamIdToNumRecords: Map[Int, Long],
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
......@@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receiverNumRecords.map(_._2).sum
def numRecords: Long = streamIdToNumRecords.values.sum
}
private[ui] object BatchUIData {
......
......@@ -29,7 +29,6 @@ import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
......@@ -38,7 +37,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
......@@ -145,7 +144,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
def numReceivers: Int = synchronized {
receiverInfos.size
}
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
......@@ -175,39 +176,42 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchUIData.toSeq
}
def processingDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.processingDelay)
def streamName(streamId: Int): Option[String] = {
ssc.graph.getInputStreamName(streamId)
}
def schedulingDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.schedulingDelay)
}
/**
* Return all InputDStream Ids
*/
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
def totalDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.totalDelay)
}
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = retainedBatches.reverse.take(batchUIDataLimit)
val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords)
val streamIds = ssc.graph.getInputStreams().map(_.id)
streamIds.map { id =>
val recordsOfParticularReceiver =
latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration)
val distribution = Distribution(recordsOfParticularReceiver)
(id, distribution)
/**
* Return all of the event rates for each InputDStream in each batch. The key of the return value
* is the stream id, and the value is a sequence of batch time with its event rate.
*/
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val _retainedBatches = retainedBatches
val latestBatches = _retainedBatches.map { batchUIData =>
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
}
streamIds.map { streamId =>
val eventRates = latestBatches.map {
case (batchTime, streamIdToNumRecords) =>
val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
(batchTime, numRecords * 1000.0 / batchDuration)
}
(streamId, eventRates)
}.toMap
}
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceiverNumRecords = lastReceivedBatch.map(_.receiverNumRecords)
val streamIds = ssc.graph.getInputStreams().map(_.id)
lastReceiverNumRecords.map { receiverNumRecords =>
streamIds.map { id =>
(id, receiverNumRecords.getOrElse(id, 0L))
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
streamIds.map { streamId =>
(streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
}.toMap
}.getOrElse {
streamIds.map(id => (id, 0L)).toMap
streamIds.map(streamId => (streamId, 0L)).toMap
}
}
......@@ -215,10 +219,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}
def receiverIds(): Iterable[Int] = synchronized {
receiverInfos.keys
}
def lastCompletedBatch: Option[BatchUIData] = synchronized {
completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}
......@@ -227,15 +227,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
retainedBatches.lastOption
}
private def retainedBatches: Seq[BatchUIData] = {
def retainedBatches: Seq[BatchUIData] = synchronized {
(waitingBatchUIData.values.toSeq ++
runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
}
private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
}
def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
val batchUIData = waitingBatchUIData.get(batchTime).orElse {
runningBatchUIData.get(batchTime).orElse {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
object UIUtils {
/**
* Return the short string for a `TimeUnit`.
*/
def shortTimeUnitString(unit: TimeUnit): String = unit match {
case TimeUnit.NANOSECONDS => "ns"
case TimeUnit.MICROSECONDS => "us"
case TimeUnit.MILLISECONDS => "ms"
case TimeUnit.SECONDS => "sec"
case TimeUnit.MINUTES => "min"
case TimeUnit.HOURS => "hrs"
case TimeUnit.DAYS => "days"
}
/**
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
* after converting, also with its TimeUnit.
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
return (milliseconds, TimeUnit.MILLISECONDS)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 60) {
return (seconds, TimeUnit.SECONDS)
}
val minutes = seconds / 60
if (minutes < 60) {
return (minutes, TimeUnit.MINUTES)
}
val hours = minutes / 60
if (hours < 24) {
return (hours, TimeUnit.HOURS)
}
val days = hours / 24
(days, TimeUnit.DAYS)
}
/**
* Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
* will discard the fractional part.
*/
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
case TimeUnit.MICROSECONDS => milliseconds * 1000
case TimeUnit.MILLISECONDS => milliseconds
case TimeUnit.SECONDS => milliseconds / 1000.0
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
}
}
......@@ -94,19 +94,34 @@ class UISeleniumSuite
eventually(timeout(10 seconds), interval(50 milliseconds)) {
// check whether streaming page exists
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
statisticText should contain("Network receivers:")
statisticText should contain("Batch interval:")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
h3Text should contain("Streaming Statistics")
// Check stat table
val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq
statTableHeaders.exists(
_.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be
(true)
statTableHeaders should contain ("Histograms")
val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq
statTableCells.exists(_.contains("Input Rate")) should be (true)
statTableCells.exists(_.contains("Scheduling Delay")) should be (true)
statTableCells.exists(_.contains("Processing Time")) should be (true)
statTableCells.exists(_.contains("Total Delay")) should be (true)
// Check batch tables
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
"Status")
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
"Total Delay")
}
val batchLinks =
......@@ -176,9 +191,8 @@ class UISeleniumSuite
eventually(timeout(10 seconds), interval(50 milliseconds)) {
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
statisticText should not contain ("Network receivers:")
statisticText should not contain ("Batch interval:")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
h3Text should not contain("Streaming Statistics")
}
}
}
......
......@@ -94,7 +94,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.numRecords should be(600)
batchUIData.get.outputOpIdSparkJobIdPairs should be
Seq(OutputOpIdAndSparkJobId(0, 0),
......@@ -138,7 +138,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
test("Remove the old completed batches when exceeding the limit") {
val ssc = setupStreams(input, operation)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
......@@ -155,7 +155,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
test("out-of-order onJobStart and onBatchXXX") {
val ssc = setupStreams(input, operation)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
// fulfill completedBatchInfos
......@@ -182,7 +182,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
batchUIData.get.receiverNumRecords should be (Map.empty)
batchUIData.get.streamIdToNumRecords should be (Map.empty)
batchUIData.get.numRecords should be (0)
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
......@@ -203,4 +203,48 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}
test("detect memory leak") {
val ssc = setupStreams(input, operation)
val listener = new StreamingJobProgressListener(ssc)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
for (_ <- 0 until 2 * limit) {
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
// onBatchStarted
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
// onJobStart
val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
listener.onJobStart(jobStart1)
val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart2)
val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
listener.onJobStart(jobStart3)
val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
listener.onJobStart(jobStart4)
// onBatchCompleted
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}
listener.waitingBatches.size should be (0)
listener.runningBatches.size should be (0)
listener.retainedCompletedBatches.size should be (limit)
listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
import org.scalatest.FunSuite
import org.scalatest.Matchers
class UIUtilsSuite extends FunSuite with Matchers{
test("shortTimeUnitString") {
assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
}
test("normalizeDuration") {
verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
}
private def verifyNormalizedTime(
expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
val (time, unit) = UIUtils.normalizeDuration(input)
time should be (expectedTime +- 1E-6)
unit should be (expectedUnit)
}
test("convertToTimeUnit") {
verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
}
private def verifyConvertToTimeUnit(
expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
convertedTime should be (expectedTime +- 1E-6)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册