提交 1528ff4c 编写于 作者: L Liang-Chi Hsieh 提交者: Yin Huai

[SPARK-14156][SQL] Use executedPlan in HiveComparisonTest for the messages of computed tables

## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-14156

In HiveComparisonTest, when catalyst results are different to hive results, we will collect the messages for computed tables during the test. During creating the message, we use sparkPlan. But we actually run the query with executedPlan. So the error message is sometimes confusing.

For example, as wholestage codegen is enabled by default now. The shown spark plan for computed tables is the plan before wholestage codegen.

A concrete is the following error message shown before this patch. It is the error shown when running `HiveCompatibilityTest` `auto_join26`.

auto_join26 has one SQL to create table:

    INSERT OVERWRITE TABLE dest_j1
    SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;   (1)

Then a SQL to retrieve the result:

    select * from dest_j1 x order by x.key;   (2)

When the above SQL (2) to retrieve the result fails, In `HiveComparisonTest` we will try to collect and show the generated data from table `dest_j1` using the SQL (1)'s spark plan. The you will see this error:

    TungstenAggregate(key=[key#8804], functions=[(count(1),mode=Partial,isDistinct=false)], output=[key#8804,count#8834L])
    +- Project [key#8804]
       +- BroadcastHashJoin [key#8804], [key#8806], Inner, BuildRight, None
          :- Filter isnotnull(key#8804)
          :  +- InMemoryColumnarTableScan [key#8804], [isnotnull(key#8804)], InMemoryRelation [key#8804,value#8805], true, 5, StorageLevel(true, true, false, true, 1), HiveTableScan [key#8717,value#8718], MetastoreRelation default, src1, None, Some(src1)
          +- Filter isnotnull(key#8806)
             +- InMemoryColumnarTableScan [key#8806], [isnotnull(key#8806)], InMemoryRelation [key#8806,value#8807], true, 5, StorageLevel(true, true, false, true, 1), HiveTableScan [key#8760,value#8761], MetastoreRelation default, src, None, Some(src)

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:140)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:137)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:120)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:87)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:82)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
	... 70 more
    Caused by: java.lang.UnsupportedOperationException: Filter does not implement doExecuteBroadcast
	at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:221)

The message is confusing because it is not the plan actually run by SparkSQL engine to create the generated table. The plan actually run is no problem. But as before this patch, we run `e.sparkPlan.collect` to retrieve and show the generated data, spark plan is not the plan we can run. So the above error will be shown.

After this patch, we won't see the error because the executed plan is no problem and works.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11957 from viirya/use-executedplan.
上级 4a7636f2
......@@ -480,7 +480,11 @@ abstract class HiveComparisonTest
val executions = queryList.map(new TestHive.QueryExecution(_))
executions.foreach(_.toRdd)
val tablesGenerated = queryList.zip(executions).flatMap {
case (q, e) => e.sparkPlan.collect {
// We should take executedPlan instead of sparkPlan, because in following codes we
// will run the collected plans. As we will do extra processing for sparkPlan such
// as adding exchage, collapsing codegen stages, etc., collecing sparkPlan here
// will cause some errors when running these plans later.
case (q, e) => e.executedPlan.collect {
case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
(q, e, i)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册