提交 74267beb 编写于 作者: M mwws 提交者: Sean Owen

[SPARK-13758][STREAMING][CORE] enhance exception message to avoid misleading

We have a recoverable Spark streaming job with checkpoint enabled, it could be executed correctly at first time, but throw following exception when restarted and recovered from checkpoint.
```
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
 	at org.apache.spark.rdd.RDD.union(RDD.scala:565)
 	at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
 	at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
```

According to exception, it shows I invoked transformations and actions in other transformations, but I did not. The real reason is that I used external RDD in DStream operation. External RDD data is not stored in checkpoint, so that during recovering, the initial value of _sc in this RDD is assigned to null and hit above exception. But you can find the error message is misleading, it indicates nothing about the real issue
Here is the code to reproduce it.

```scala
object Repo {

  def createContext(ip: String, port: Int, checkpointDirectory: String):StreamingContext = {

    println("Creating new context")
    val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint(checkpointDirectory)

    var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))

    val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
    words.foreachRDD((rdd: RDD[String]) => {
      val res = rdd.map(word => (word, word.length)).collect()
      println("words: " + res.mkString(", "))

      cached = cached.union(rdd)
      cached.checkpoint()
      println("cached words: " + cached.collect.mkString(", "))
    })
    ssc
  }

  def main(args: Array[String]) {

    val ip = "localhost"
    val port = 9999
    val dir = "/home/maowei/tmp"

    val ssc = StreamingContext.getOrCreate(dir,
      () => {
        createContext(ip, port, dir)
      })
    ssc.start()
    ssc.awaitTermination()
  }
}
```

Author: mwws <wei.mao@intel.com>

Closes #11595 from mwws/SPARK-MissleadingLog.
上级 927e22ef
...@@ -85,10 +85,14 @@ abstract class RDD[T: ClassTag]( ...@@ -85,10 +85,14 @@ abstract class RDD[T: ClassTag](
private def sc: SparkContext = { private def sc: SparkContext = {
if (_sc == null) { if (_sc == null) {
throw new SparkException( throw new SparkException(
"RDD transformations and actions can only be invoked by the driver, not inside of other " + "This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " +
"transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because " + "transformations and actions are NOT invoked by the driver, but inside of other " +
"the values transformation and count action cannot be performed inside of the rdd1.map " + "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
"transformation. For more information, see SPARK-5063.") "because the values transformation and count action cannot be performed inside of the " +
"rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
"Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
"an RDD not defined by the streaming job is used in DStream operations. For more " +
"information, See SPARK-13758.")
} }
_sc _sc
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册