diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java index 93fe716bd185065a6cdd2e790699adc3ee10139c..f664e6110ad01414c7b77cc320993a3741333875 100644 --- a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java @@ -328,7 +328,7 @@ public class JobClient { continue; } - this.console.println(event.toString()); + LOG.info(event.toString()); this.lastProcessedEventSequenceNumber = event.getSequenceNumber(); diff --git a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/ScalaPlan.scala b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/ScalaPlan.scala index af2aa971fd8554fbb7481f67e1f22425f8025f41..8ab794bd87d10c14a63f3b7622c7e338b43563f5 100644 --- a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/ScalaPlan.scala +++ b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/ScalaPlan.scala @@ -26,7 +26,7 @@ import eu.stratosphere.pact.common.plan.PlanAssemblerDescription class ScalaPlan(scalaSinks: Seq[ScalaSink[_]], scalaJobName: String = "PACT SCALA Job at " + Calendar.getInstance().getTime()) extends Plan(asJavaCollection(scalaSinks map { _.sink }), scalaJobName) { val pactSinks = scalaSinks map { _.sink.asInstanceOf[Contract with ScalaContract[_]] } - GlobalSchemaGenerator.initGlobalSchema(pactSinks) + new GlobalSchemaGenerator().initGlobalSchema(pactSinks) override def getPostPassClassName() = "eu.stratosphere.scala.ScalaPostPass"; } diff --git a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaGenerator.scala b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaGenerator.scala index 1b66a50cf1a64c50ad5e40c5c64bc1475218acbc..11e9b1ba4854e03ffbc672b3c768ad58eb0dd636 100644 --- a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaGenerator.scala +++ b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaGenerator.scala @@ -39,7 +39,7 @@ import eu.stratosphere.pact.generic.contract.WorksetIteration import eu.stratosphere.scala.WorksetIterationScalaContract import eu.stratosphere.pact.common.contract.GenericDataSink -object GlobalSchemaGenerator { +class GlobalSchemaGenerator { def initGlobalSchema(sinks: Seq[Contract with ScalaContract[_]]): Unit = { diff --git a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaPrinter.scala b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaPrinter.scala index 5ed23831802267c410abc1aad4a483e2f90fe81e..f0de1fb5d7a873f0e7b22ee0347bf0b158f877a8 100644 --- a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaPrinter.scala +++ b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/GlobalSchemaPrinter.scala @@ -13,7 +13,7 @@ package eu.stratosphere.scala.analysis -import scala.Array.canBuildFrom + import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.collectionAsScalaIterable import Extractors.CoGroupNode @@ -23,6 +23,7 @@ import Extractors.DataSourceNode import Extractors.JoinNode import Extractors.MapNode import Extractors.ReduceNode +import eu.stratosphere.scala.analysis.GlobalSchemaGenerator import eu.stratosphere.pact.common.contract.GenericDataSink import eu.stratosphere.pact.common.plan.Plan import eu.stratosphere.pact.generic.contract.Contract @@ -30,17 +31,19 @@ import eu.stratosphere.pact.generic.contract.DualInputContract import eu.stratosphere.pact.generic.contract.SingleInputContract import eu.stratosphere.pact.generic.contract.BulkIteration import eu.stratosphere.pact.generic.contract.WorksetIteration +import org.apache.commons.logging.{LogFactory, Log} object GlobalSchemaPrinter { import Extractors._ + private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaGenerator]) + def printSchema(plan: Plan): Unit = { - println("### " + plan.getJobName + " ###") + LOG.debug("### " + plan.getJobName + " ###") plan.getDataSinks.foldLeft(Set[Contract]())(printSchema) - println("####" + ("#" * plan.getJobName.length) + "####") - println() + LOG.debug("####" + ("#" * plan.getJobName.length) + "####") } private def printSchema(visited: Set[Contract], node: Contract): Set[Contract] = { @@ -201,6 +204,6 @@ object GlobalSchemaPrinter { val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", " val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", " - println(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites)) + LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites)) } } diff --git a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/postPass/GlobalSchemaPrinter.scala b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/postPass/GlobalSchemaPrinter.scala index f35c4092f62ce9b7351b283e7f9a69aa5c78b1f0..050663dc794f7d84dea821ef1ca925a359c8861b 100644 --- a/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/postPass/GlobalSchemaPrinter.scala +++ b/pact-scala/pact-scala-core/src/main/scala/eu/stratosphere/scala/analysis/postPass/GlobalSchemaPrinter.scala @@ -29,22 +29,25 @@ import eu.stratosphere.pact.compiler.plan.SinkJoiner import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan import eu.stratosphere.scala.analysis.FieldSet import eu.stratosphere.scala.analysis.FieldSelector +import eu.stratosphere.scala.analysis.postPass.GlobalSchemaOptimizer import eu.stratosphere.pact.common.plan.Plan import eu.stratosphere.pact.generic.contract.Contract import eu.stratosphere.pact.generic.contract.SingleInputContract import eu.stratosphere.pact.generic.contract.DualInputContract import eu.stratosphere.pact.common.contract.GenericDataSink +import org.apache.commons.logging.{LogFactory, Log} object GlobalSchemaPrinter { import Extractors._ + private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaOptimizer]) + def printSchema(plan: OptimizedPlan): Unit = { - println("### " + plan.getJobName + " ###") + LOG.debug("### " + plan.getJobName + " ###") plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(printSchema) - println("####" + ("#" * plan.getJobName.length) + "####") - println() + LOG.debug("####" + ("#" * plan.getJobName.length) + "####") } private def printSchema(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = { @@ -59,7 +62,7 @@ object GlobalSchemaPrinter { val newVisited = children.foldLeft(visited + node)(printSchema) node match { - + case _: SinkJoiner | _: BinaryUnionNode => case DataSinkNode(udf, input) => { @@ -121,7 +124,7 @@ object GlobalSchemaPrinter { udf.outputFields ) } - + case UnionNode(udf, input) => { printInfo(node, "Union", Seq(), @@ -134,7 +137,7 @@ object GlobalSchemaPrinter { case ReduceNode(udf, key, input) => { -// val contract = node.asInstanceOf[Reduce4sContract[_, _, _]] +// val contract = node.asInstanceOf[Reduce4sContract[_, _, _]] // contract.userCombineCode map { _ => // printInfo(node, "Combine", // Seq(("", key)), @@ -197,6 +200,6 @@ object GlobalSchemaPrinter { val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", " val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", " - println(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites)) + LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites)) } } diff --git a/pact-scala/pact-scala-examples/src/main/scala/eu/stratosphere/scala/examples/grabbag/Grabbag.scala b/pact-scala/pact-scala-examples/src/main/scala/eu/stratosphere/scala/examples/grabbag/Grabbag.scala index 71cf725ddbe3fe0ffd4185bd4eef276cea8f04cd..59faaa77feb0b7a283d37ec91f1ba7b9d2769053 100644 --- a/pact-scala/pact-scala-examples/src/main/scala/eu/stratosphere/scala/examples/grabbag/Grabbag.scala +++ b/pact-scala/pact-scala-examples/src/main/scala/eu/stratosphere/scala/examples/grabbag/Grabbag.scala @@ -12,6 +12,10 @@ import eu.stratosphere.scala.operators._ import eu.stratosphere.scala.analysis.GlobalSchemaPrinter import eu.stratosphere.pact.example.util.AsciiUtils +import org.apache.log4j.Logger +import org.apache.log4j.Level +import eu.stratosphere.scala.analysis.postPass.GlobalSchemaOptimizer +import eu.stratosphere.scala.analysis.GlobalSchemaGenerator // Grab bag of random scala examples @@ -33,6 +37,10 @@ object Main1 { def addCounts(w1: (String, Int), w2: (String, Int)) = (w1._1, w1._2 + w2._2) def main(args: Array[String]) { +// var logger = Logger.getLogger(classOf[GlobalSchemaOptimizer]) +// logger.setLevel(Level.DEBUG) +// logger = Logger.getLogger(classOf[GlobalSchemaGenerator]) +// logger.setLevel(Level.DEBUG) def formatOutput = (word: String, count: Int) => "%s %d".format(word, count) diff --git a/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/LocalExecutor.java b/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/LocalExecutor.java index 9d2b7b810d3dbfe2a787344332e7fe6635c800ee..6fbb5cb908f1d695d31cb7f0170a00229ba15015 100644 --- a/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/LocalExecutor.java +++ b/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/LocalExecutor.java @@ -17,10 +17,7 @@ package eu.stratosphere.pact.client; import java.util.List; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; +import org.apache.log4j.*; import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.jobgraph.JobGraph; @@ -54,6 +51,7 @@ public class LocalExecutor implements PlanExecutor { Logger root = Logger.getRootLogger(); PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"); ConsoleAppender appender = new ConsoleAppender(layout, "System.err"); + appender.setThreshold(Level.ERROR); root.addAppender(appender); root.setLevel(Level.WARN); }