name := "NetflixRecommender"
# Spark 推荐系统
version := "1.0"
## 离线
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.0"
libraryDependencies += "org.mongodb" %% "casbah" % "3.0.0"
libraryDependencies += "org.jblas" % "jblas" % "1.2.4"
## 实时
mergeStrategy in assembly <<= (mergeStrategy in assembly) { mergeStrategy => {
case entry => {
val strategy = mergeStrategy(entry)
if (strategy == MergeStrategy.deduplicate) MergeStrategy.first
else strategy
## 总结
......@@ -3,8 +3,16 @@
mvn clean
mvn compile
mvn package
echo 'package success'
# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.App /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar
# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.WordCount /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar
/opt/spark/bin/spark-submit --class apache.wiki.OfflineRecommender /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar
if [ $? -eq 0 ]
echo "打包成功"
echo "打包失败"
exit 1
# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.App /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar
# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.WordCount /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar
# /opt/spark/bin/spark-submit --class apache.wiki.OfflineRecommender /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar
# /opt/spark/bin/spark-submit --class apache.wiki.StreamingWordCount /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar
/opt/spark/bin/spark-submit --class apache.wiki.OnlineRecommender /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar
echo 'spark-submit success'
"resources": {
"local_catch_file": "log/cache.txt",
"local_schema_file": "conf/ad_data.avsc"
"topics": "datasys",
"consumer_group": "datasys_kafka2hive",
"brokerHosts": "zk1.common.ad.m.com:9092,zk2.common.ad.m.com:9092,zk3.common.ad.m.com:9092",
"zkHosts": "zk1.common.ad.m.com:2181,zk2.common.ad.m.com:2181,zk3.common.ad.m.com:2181",
"message_num": 400,
"root_dir": "/var/tmp"
"hdfs": {
"hdfs_name": "hadoopuser",
"hdfs_port": 50070,
"hdfs_host": "BJSH-ADHBASE-134-128.meitu-inc.com",
"hdfs_path": "/user/hadoopuser/jiangzl"
"hive": {
"hive_port": 10000,
"hive_host": "BJSH-ADHBASE-134-128.meitu-inc.com"
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<description>My wonderfull scala app</description>
......@@ -78,6 +78,11 @@
package apache.wiki
import java.io.File
import java.nio.charset.Charset
import com.google.common.io.Files
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.util.{IntParam, LongAccumulator}
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
* See LICENSE file for further information.
* 参考地址
* GitHub: https://github.com/apachecn/RecommenderSystems
* https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
return instance
* Use this singleton to get or register an Accumulator.
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
return instance
object CheckPointWordCount{
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
// 如果已存在CheckPoint,就不进入该方法
println("Creating new context")
// Streaming处理的结果存放位置
val outputFile = new File(outputPath.split(":")(1))
if (outputFile.exists()) outputFile.delete()
val conf = new SparkConf().setAppName("CheckPointWordCount")
// 默认本地模式运行
val isDebug = true
if (isDebug) {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(conf, Seconds(10))
// checkpoint存放位置
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
val lines = ssc.socketTextStream(ip, port)
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
* 累加器进行累加操作,blacklist.value的出现总次数
val counts = rdd.filter { case (word, count) =>
printf("blacklist.value=%s, word=%s, count=%d\n", blacklist.value, word, count)
if (blacklist.value.contains(word)) {
println("return false")
} else {
println("return true")
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
Files.append(output + "\n", outputFile, Charset.defaultCharset())
return ssc
def main(args: Array[String]): Unit = {
val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/"
// 设置CheckPoint
val (ip, port, outputPath, checkpointDir) = ("localhost", 9999, base + "output/out", base + "output/checkpointDir")
val ssc = StreamingContext.getOrCreate(checkpointDir, () => createContext(ip, port, outputPath, checkpointDir))
ssc.start() // 启动计算
ssc.awaitTermination() // 等待计算的终止
\ No newline at end of file
......@@ -15,7 +15,7 @@ import org.apache.spark.rdd.RDD
* See LICENSE file for further information.
* 参考地址
* GitHub:
* GitHub: https://github.com/apachecn/RecommenderSystems
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
......@@ -107,7 +107,7 @@ object OfflineRecommender {
// 初始化 SparkContext
val conf = new SparkConf().setMaster("local").setAppName("OfflineRecommender")
val sc = new SparkContext(conf)
val base = if (args.length > 0) args(0) else "file:/opt/git/RecommendedSystem/"
val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/"
// 导入数据,获取RDD
// UserMovies格式: 用户名,电影ID,评分
......@@ -119,14 +119,14 @@ object OfflineRecommender {
preparation(rawUserMoviesData, rawHotMoviesData)
// 抽样评估推荐结果
model(sc, rawUserMoviesData, rawHotMoviesData)
// model(sc, rawUserMoviesData, rawHotMoviesData)
// 整体推荐评分的评估
evaluate(sc,rawUserMoviesData, rawHotMoviesData)
// evaluate(sc,rawUserMoviesData, rawHotMoviesData)
// recommend(sc, rawUserMoviesData, rawHotMoviesData, base)
recommend(sc, rawUserMoviesData, rawHotMoviesData, base)
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
// sc.stop()
package apache.wiki
// package apache.wiki
import scala.collection.Map
// import java.io.File
// import java.nio.charset.Charset
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
// import com.google.common.io.Files
// import org.apache.spark.{SparkConf, SparkContext}
// import org.apache.spark.broadcast.Broadcast
// import org.apache.spark.rdd.RDD
// import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
// import org.apache.spark.util.{IntParam, LongAccumulator}
// /**
// * @author ${user.name}
// * Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
// *
// * See LICENSE file for further information.
// *
// * 参考地址
// * GitHub: https://github.com/apachecn/RecommenderSystems
// * https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
// */
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
* See LICENSE file for further information.
* 参考地址
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
\ No newline at end of file
// object WordBlacklist {
// @volatile private var instance: Broadcast[Seq[String]] = null
// def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
// if (instance == null) {
// synchronized {
// if (instance == null) {
// val wordBlacklist = Seq("a", "b", "c")
// instance = sc.broadcast(wordBlacklist)
// }
// }
// }
// return instance
// }
// }
// /**
// * Use this singleton to get or register an Accumulator.
// */
// object DroppedWordsCounter {
// @volatile private var instance: LongAccumulator = null
// def getInstance(sc: SparkContext): LongAccumulator = {
// if (instance == null) {
// synchronized {
// if (instance == null) {
// instance = sc.longAccumulator("WordsInBlacklistCounter")
// }
// }
// }
// return instance
// }
// }
// object OnlineRecommender{
// def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
// // 如果已存在CheckPoint,就不进入该方法
// println("Creating new context")
// // Streaming处理的结果存放位置
// val outputFile = new File(outputPath.split(":")(1))
// if (outputFile.exists()) outputFile.delete()
// val conf = new SparkConf().setAppName("OnlineRecommender")
// // 默认本地模式运行
// val isDebug = true
// if (isDebug) {
// conf.setMaster("local[2]")
// }
// // Create the context with a 1 second batch size
// val ssc = new StreamingContext(conf, Seconds(10))
// // checkpoint存放位置
// ssc.checkpoint(checkpointDirectory)
// // 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
// val lines = ssc.socketTextStream(ip, port)
// // 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
// val words = lines.flatMap(_.split(" "))
// val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
// wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// // Get or register the blacklist Broadcast
// val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// // Get or register the droppedWordsCounter Accumulator
// val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// // Use blacklist to drop words and use droppedWordsCounter to count them
// /*
// * 累加器进行累加操作,blacklist.value的出现总次数
// */
// val counts = rdd.filter { case (word, count) =>
// printf("blacklist.value=%s, word=%s, count=%d\n", blacklist.value, word, count)
// if (blacklist.value.contains(word)) {
// droppedWordsCounter.add(count)
// println("return false")
// false
// } else {
// println("return true")
// true
// }
// }.collect().mkString("[", ", ", "]")
// val output = "Counts at time " + time + " " + counts
// println(output)
// println("Dropped " + droppedWordsCounter.value + " word(s) totally")
// println("Appending to " + outputFile.getAbsolutePath)
// Files.append(output + "\n", outputFile, Charset.defaultCharset())
// }
// return ssc
// }
// def main(args: Array[String]): Unit = {
// val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/"
// // 设置CheckPoint
// val (ip, port, outputPath, checkpointDir) = ("localhost", 9999, base + "output/out", base + "output/checkpointDir")
// val ssc = StreamingContext.getOrCreate(checkpointDir, () => createContext(ip, port, outputPath, checkpointDir))
// ssc.start() // 启动计算
// ssc.awaitTermination() // 等待计算的终止
// }
// }
\ No newline at end of file
package apache.wiki
import org.apache.spark._
import org.apache.spark.streaming._
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
* See LICENSE file for further information.
* 参考地址
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
object StreamingWordCount{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
// 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute)
ssc.start() // 启动计算
ssc.awaitTermination() // 等待计算的终止
\ No newline at end of file
......@@ -21,14 +21,14 @@ object WordCount {
// sys.exit()
// }
val lines = sc.textFile("file:/opt/git/RecommendedSystem/README.md")
val lines = sc.textFile("file:/opt/git/RecommenderSystems/README.md")
lines.flatMap(_.split(" "))
.map((_, 1))
.map(x => (x._2, x._1))
.map(x => (x._2, x._1))
// println("this system exit ok!!!")
