提交 14a6cb93 编写于 作者: A Aljoscha Krettek 提交者: StephanEwen

Fix some log/debugging output

Make the scala schema printers use log4j

Fix the additional syserr appender in LocalExecutor: add a threshold so
that it does only output error messages

Make JobClient use log4j for the event output that was annoying me
上级 2d7801d3
......@@ -328,7 +328,7 @@ public class JobClient {
continue;
}
this.console.println(event.toString());
LOG.info(event.toString());
this.lastProcessedEventSequenceNumber = event.getSequenceNumber();
......
......@@ -26,7 +26,7 @@ import eu.stratosphere.pact.common.plan.PlanAssemblerDescription
class ScalaPlan(scalaSinks: Seq[ScalaSink[_]], scalaJobName: String = "PACT SCALA Job at " + Calendar.getInstance().getTime()) extends Plan(asJavaCollection(scalaSinks map { _.sink }), scalaJobName) {
val pactSinks = scalaSinks map { _.sink.asInstanceOf[Contract with ScalaContract[_]] }
GlobalSchemaGenerator.initGlobalSchema(pactSinks)
new GlobalSchemaGenerator().initGlobalSchema(pactSinks)
override def getPostPassClassName() = "eu.stratosphere.scala.ScalaPostPass";
}
......
......@@ -39,7 +39,7 @@ import eu.stratosphere.pact.generic.contract.WorksetIteration
import eu.stratosphere.scala.WorksetIterationScalaContract
import eu.stratosphere.pact.common.contract.GenericDataSink
object GlobalSchemaGenerator {
class GlobalSchemaGenerator {
def initGlobalSchema(sinks: Seq[Contract with ScalaContract[_]]): Unit = {
......
......@@ -13,7 +13,7 @@
package eu.stratosphere.scala.analysis
import scala.Array.canBuildFrom
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.collectionAsScalaIterable
import Extractors.CoGroupNode
......@@ -23,6 +23,7 @@ import Extractors.DataSourceNode
import Extractors.JoinNode
import Extractors.MapNode
import Extractors.ReduceNode
import eu.stratosphere.scala.analysis.GlobalSchemaGenerator
import eu.stratosphere.pact.common.contract.GenericDataSink
import eu.stratosphere.pact.common.plan.Plan
import eu.stratosphere.pact.generic.contract.Contract
......@@ -30,17 +31,19 @@ import eu.stratosphere.pact.generic.contract.DualInputContract
import eu.stratosphere.pact.generic.contract.SingleInputContract
import eu.stratosphere.pact.generic.contract.BulkIteration
import eu.stratosphere.pact.generic.contract.WorksetIteration
import org.apache.commons.logging.{LogFactory, Log}
object GlobalSchemaPrinter {
import Extractors._
private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaGenerator])
def printSchema(plan: Plan): Unit = {
println("### " + plan.getJobName + " ###")
LOG.debug("### " + plan.getJobName + " ###")
plan.getDataSinks.foldLeft(Set[Contract]())(printSchema)
println("####" + ("#" * plan.getJobName.length) + "####")
println()
LOG.debug("####" + ("#" * plan.getJobName.length) + "####")
}
private def printSchema(visited: Set[Contract], node: Contract): Set[Contract] = {
......@@ -201,6 +204,6 @@ object GlobalSchemaPrinter {
val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", "
println(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
}
}
......@@ -29,22 +29,25 @@ import eu.stratosphere.pact.compiler.plan.SinkJoiner
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan
import eu.stratosphere.scala.analysis.FieldSet
import eu.stratosphere.scala.analysis.FieldSelector
import eu.stratosphere.scala.analysis.postPass.GlobalSchemaOptimizer
import eu.stratosphere.pact.common.plan.Plan
import eu.stratosphere.pact.generic.contract.Contract
import eu.stratosphere.pact.generic.contract.SingleInputContract
import eu.stratosphere.pact.generic.contract.DualInputContract
import eu.stratosphere.pact.common.contract.GenericDataSink
import org.apache.commons.logging.{LogFactory, Log}
object GlobalSchemaPrinter {
import Extractors._
private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaOptimizer])
def printSchema(plan: OptimizedPlan): Unit = {
println("### " + plan.getJobName + " ###")
LOG.debug("### " + plan.getJobName + " ###")
plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(printSchema)
println("####" + ("#" * plan.getJobName.length) + "####")
println()
LOG.debug("####" + ("#" * plan.getJobName.length) + "####")
}
private def printSchema(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
......@@ -59,7 +62,7 @@ object GlobalSchemaPrinter {
val newVisited = children.foldLeft(visited + node)(printSchema)
node match {
case _: SinkJoiner | _: BinaryUnionNode =>
case DataSinkNode(udf, input) => {
......@@ -121,7 +124,7 @@ object GlobalSchemaPrinter {
udf.outputFields
)
}
case UnionNode(udf, input) => {
printInfo(node, "Union",
Seq(),
......@@ -134,7 +137,7 @@ object GlobalSchemaPrinter {
case ReduceNode(udf, key, input) => {
// val contract = node.asInstanceOf[Reduce4sContract[_, _, _]]
// val contract = node.asInstanceOf[Reduce4sContract[_, _, _]]
// contract.userCombineCode map { _ =>
// printInfo(node, "Combine",
// Seq(("", key)),
......@@ -197,6 +200,6 @@ object GlobalSchemaPrinter {
val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", "
println(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
}
}
......@@ -12,6 +12,10 @@ import eu.stratosphere.scala.operators._
import eu.stratosphere.scala.analysis.GlobalSchemaPrinter
import eu.stratosphere.pact.example.util.AsciiUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import eu.stratosphere.scala.analysis.postPass.GlobalSchemaOptimizer
import eu.stratosphere.scala.analysis.GlobalSchemaGenerator
// Grab bag of random scala examples
......@@ -33,6 +37,10 @@ object Main1 {
def addCounts(w1: (String, Int), w2: (String, Int)) = (w1._1, w1._2 + w2._2)
def main(args: Array[String]) {
// var logger = Logger.getLogger(classOf[GlobalSchemaOptimizer])
// logger.setLevel(Level.DEBUG)
// logger = Logger.getLogger(classOf[GlobalSchemaGenerator])
// logger.setLevel(Level.DEBUG)
def formatOutput = (word: String, count: Int) => "%s %d".format(word, count)
......
......@@ -17,10 +17,7 @@ package eu.stratosphere.pact.client;
import java.util.List;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.*;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
......@@ -54,6 +51,7 @@ public class LocalExecutor implements PlanExecutor {
Logger root = Logger.getRootLogger();
PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
appender.setThreshold(Level.ERROR);
root.addAppender(appender);
root.setLevel(Level.WARN);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册