你可以很容易地使用 DataFrames 和 SQL Streaming 操作数据。 需要使用 SparkContext 或者正在使用的 StreamingContext 创建一个 SparkSession。这样做的目的就是为了使得驱动程序可以在失败之后进行重启。 使用懒加载模式创建单例的 SparkSession 对象。下面的示例所示。在原先的 单词统计 程序的基础上进行修改,使用 DataFrames 和 SQL 生成单词统计。 每个 RDD 转换为 DataFrame,注册为临时表,然后使用 SQL 查询。
/** 流程序中的DataFrame操作 */ val words: DStream[String] = ... words.foreachRDD { rdd => // 获取单例的SQLContext val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // 将RDD [String]转换为DataFrame val wordsDataFrame = rdd.toDF("word") // 注册临时表 wordsDataFrame.registerTempTable("words") // 在DataFrame上使用SQL进行字计数并打印它 val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
这里是完整 源代码 。
你可以在使用其他线程读取的流数据上进行 SQL 查询(就是说,可以异步运行 StreamingContext)。 只要确保 StreamingContext 可以缓存一定量的数据来满足查询的需求。 否则 StreamingContext,检测不到任何异步 SQL 查询,在完成查询之前将删除旧的数据。 例如,如果您想查询最后一批,但您的查询可以运行需要 5 分钟,然后调用 streamingContext.remember(Minutes(5))(
在 Scala 中,或其他语言)。
请看 DataFrames,Datasets 和 SQL 指南学习更多关于 DataFrames 的知识。