diff --git a/README.md b/README.md index ad9162fc493353da545964990d3e1c64f9182fc5..11d49f27eee3100574a14c47d5b5960a75b1367a 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,7 @@ -name := "NetflixRecommender" - -version := "1.0" - -scalaVersion := "2.10.4" -libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.4.0" -libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.4.0" -libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0" -libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.0" -libraryDependencies += "org.mongodb" %% "casbah" % "3.0.0" -libraryDependencies += "org.jblas" % "jblas" % "1.2.4" +# Spark 推荐系统 -mergeStrategy in assembly <<= (mergeStrategy in assembly) { mergeStrategy => { - case entry => { - val strategy = mergeStrategy(entry) - if (strategy == MergeStrategy.deduplicate) MergeStrategy.first - else strategy - } - } -} +## 离线 + +## 实时 + +## 总结 diff --git a/bin/start.sh b/bin/start.sh index 4984885e5488406541e160fe1ce014992f5abe7e..24e28a37caaff48c8be7197b8ceca183bf19c662 100644 --- a/bin/start.sh +++ b/bin/start.sh @@ -3,8 +3,16 @@ mvn clean mvn compile mvn package -echo 'package success' -# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.App /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar -# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.WordCount /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar -/opt/spark/bin/spark-submit --class apache.wiki.OfflineRecommender /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar +if [ $? -eq 0 ] +then + echo "打包成功" +else + echo "打包失败" + exit 1 +fi +# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.App /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar +# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.WordCount /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar +# /opt/spark/bin/spark-submit --class apache.wiki.OfflineRecommender /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar +# /opt/spark/bin/spark-submit --class apache.wiki.StreamingWordCount /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar +/opt/spark/bin/spark-submit --class apache.wiki.OnlineRecommender /opt/git/RecommenderSystems/target/RecommenderSystems-1.0-SNAPSHOT.jar echo 'spark-submit success' diff --git a/conf/test.json b/conf/test.json new file mode 100644 index 0000000000000000000000000000000000000000..8e3c18e133d499932ee91478045a4b1828fdbc4f --- /dev/null +++ b/conf/test.json @@ -0,0 +1,24 @@ +{ + "resources": { + "local_catch_file": "log/cache.txt", + "local_schema_file": "conf/ad_data.avsc" + }, + "kafka":{ + "topics": "datasys", + "consumer_group": "datasys_kafka2hive", + "brokerHosts": "zk1.common.ad.m.com:9092,zk2.common.ad.m.com:9092,zk3.common.ad.m.com:9092", + "zkHosts": "zk1.common.ad.m.com:2181,zk2.common.ad.m.com:2181,zk3.common.ad.m.com:2181", + "message_num": 400, + "root_dir": "/var/tmp" + }, + "hdfs": { + "hdfs_name": "hadoopuser", + "hdfs_port": 50070, + "hdfs_host": "BJSH-ADHBASE-134-128.meitu-inc.com", + "hdfs_path": "/user/hadoopuser/jiangzl" + }, + "hive": { + "hive_port": 10000, + "hive_host": "BJSH-ADHBASE-134-128.meitu-inc.com" + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index fe4dd59f5bb99f7bdff748de744c892b6b1015cf..a010f53d2e01787d277d35da2c356861a834cf64 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ 4.0.0 apache.wiki - RecommendedSystem + RecommenderSystems 1.0-SNAPSHOT ${project.artifactId} My wonderfull scala app @@ -78,6 +78,11 @@ spark-mllib_${scala.compat.version} ${spark.version} + + org.apache.spark + spark-streaming_${scala.compat.version} + ${spark.version} + diff --git a/src/main/scala/apache/wiki/CheckPointWordCount.scala b/src/main/scala/apache/wiki/CheckPointWordCount.scala new file mode 100644 index 0000000000000000000000000000000000000000..2047bbea222a84477e8544a132983c23007f5134 --- /dev/null +++ b/src/main/scala/apache/wiki/CheckPointWordCount.scala @@ -0,0 +1,128 @@ +package apache.wiki + +import java.io.File +import java.nio.charset.Charset + +import com.google.common.io.Files + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} +import org.apache.spark.util.{IntParam, LongAccumulator} +/** + * @author ${user.name} + * Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills + * + * See LICENSE file for further information. + * + * 参考地址 + * GitHub: https://github.com/apachecn/RecommenderSystems + * https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala + */ + +object WordBlacklist { + + @volatile private var instance: Broadcast[Seq[String]] = null + + def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { + if (instance == null) { + synchronized { + if (instance == null) { + val wordBlacklist = Seq("a", "b", "c") + instance = sc.broadcast(wordBlacklist) + } + } + } + return instance + } +} + +/** + * Use this singleton to get or register an Accumulator. + */ +object DroppedWordsCounter { + + @volatile private var instance: LongAccumulator = null + + def getInstance(sc: SparkContext): LongAccumulator = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = sc.longAccumulator("WordsInBlacklistCounter") + } + } + } + return instance + } +} + +object CheckPointWordCount{ + def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = { + + // 如果已存在CheckPoint,就不进入该方法 + println("Creating new context") + + // Streaming处理的结果存放位置 + val outputFile = new File(outputPath.split(":")(1)) + if (outputFile.exists()) outputFile.delete() + + val conf = new SparkConf().setAppName("CheckPointWordCount") + // 默认本地模式运行 + val isDebug = true + if (isDebug) { + conf.setMaster("local[2]") + } + // Create the context with a 1 second batch size + val ssc = new StreamingContext(conf, Seconds(10)) + // checkpoint存放位置 + ssc.checkpoint(checkpointDirectory) + + // 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999 + val lines = ssc.socketTextStream(ip, port) + // 将每一行拆分成单词 val words = lines.flatMap(_.split(" ")) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map((_, 1)).reduceByKey(_ + _) + + wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => + // Get or register the blacklist Broadcast + val blacklist = WordBlacklist.getInstance(rdd.sparkContext) + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) + // Use blacklist to drop words and use droppedWordsCounter to count them + /* + * 累加器进行累加操作,blacklist.value的出现总次数 + */ + val counts = rdd.filter { case (word, count) => + printf("blacklist.value=%s, word=%s, count=%d\n", blacklist.value, word, count) + if (blacklist.value.contains(word)) { + droppedWordsCounter.add(count) + println("return false") + false + } else { + println("return true") + true + } + }.collect().mkString("[", ", ", "]") + val output = "Counts at time " + time + " " + counts + println(output) + println("Dropped " + droppedWordsCounter.value + " word(s) totally") + println("Appending to " + outputFile.getAbsolutePath) + Files.append(output + "\n", outputFile, Charset.defaultCharset()) + } + return ssc + } + + + def main(args: Array[String]): Unit = { + + val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/" + + // 设置CheckPoint + val (ip, port, outputPath, checkpointDir) = ("localhost", 9999, base + "output/out", base + "output/checkpointDir") + val ssc = StreamingContext.getOrCreate(checkpointDir, () => createContext(ip, port, outputPath, checkpointDir)) + + ssc.start() // 启动计算 + ssc.awaitTermination() // 等待计算的终止 + } +} \ No newline at end of file diff --git a/src/main/scala/apache/wiki/OfflineRecommender.scala b/src/main/scala/apache/wiki/OfflineRecommender.scala index 0e17b69c98ddc929785f3ba7e1cfa204801ca80d..1da263325f79442314bbd837a54d96771f38c5bd 100644 --- a/src/main/scala/apache/wiki/OfflineRecommender.scala +++ b/src/main/scala/apache/wiki/OfflineRecommender.scala @@ -15,7 +15,7 @@ import org.apache.spark.rdd.RDD * See LICENSE file for further information. * * 参考地址 - * GitHub: + * GitHub: https://github.com/apachecn/RecommenderSystems * 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html * ALS说明: http://www.csdn.net/article/2015-05-07/2824641 */ @@ -107,7 +107,7 @@ object OfflineRecommender { // 初始化 SparkContext val conf = new SparkConf().setMaster("local").setAppName("OfflineRecommender") val sc = new SparkContext(conf) - val base = if (args.length > 0) args(0) else "file:/opt/git/RecommendedSystem/" + val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/" // 导入数据,获取RDD // UserMovies格式: 用户名,电影ID,评分 @@ -119,14 +119,14 @@ object OfflineRecommender { preparation(rawUserMoviesData, rawHotMoviesData) println("准备完数据") // 抽样评估推荐结果 - model(sc, rawUserMoviesData, rawHotMoviesData) + // model(sc, rawUserMoviesData, rawHotMoviesData) // 整体推荐评分的评估 - evaluate(sc,rawUserMoviesData, rawHotMoviesData) + // evaluate(sc,rawUserMoviesData, rawHotMoviesData) - // recommend(sc, rawUserMoviesData, rawHotMoviesData, base) + recommend(sc, rawUserMoviesData, rawHotMoviesData, base) // 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。 - // sc.stop() + sc.stop() } diff --git a/src/main/scala/apache/wiki/OnlineRecommender.scala b/src/main/scala/apache/wiki/OnlineRecommender.scala index f569d4898027cbd59503b74ad28eab46b6ab2c85..bf5b1479972d42402709dca5a4a159efacf0412f 100644 --- a/src/main/scala/apache/wiki/OnlineRecommender.scala +++ b/src/main/scala/apache/wiki/OnlineRecommender.scala @@ -1,18 +1,128 @@ -package apache.wiki +// package apache.wiki -import scala.collection.Map +// import java.io.File +// import java.nio.charset.Charset -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ +// import com.google.common.io.Files +// import org.apache.spark.{SparkConf, SparkContext} +// import org.apache.spark.broadcast.Broadcast +// import org.apache.spark.rdd.RDD +// import org.apache.spark.streaming.{Seconds, StreamingContext, Time} +// import org.apache.spark.util.{IntParam, LongAccumulator} +// /** +// * @author ${user.name} +// * Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills +// * +// * See LICENSE file for further information. +// * +// * 参考地址 +// * GitHub: https://github.com/apachecn/RecommenderSystems +// * https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +// */ -/** - * @author ${user.name} - * Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills - * - * See LICENSE file for further information. - * - * 参考地址 - * 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html - * ALS说明: http://www.csdn.net/article/2015-05-07/2824641 - */ \ No newline at end of file +// object WordBlacklist { + +// @volatile private var instance: Broadcast[Seq[String]] = null + +// def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { +// if (instance == null) { +// synchronized { +// if (instance == null) { +// val wordBlacklist = Seq("a", "b", "c") +// instance = sc.broadcast(wordBlacklist) +// } +// } +// } +// return instance +// } +// } + +// /** +// * Use this singleton to get or register an Accumulator. +// */ +// object DroppedWordsCounter { + +// @volatile private var instance: LongAccumulator = null + +// def getInstance(sc: SparkContext): LongAccumulator = { +// if (instance == null) { +// synchronized { +// if (instance == null) { +// instance = sc.longAccumulator("WordsInBlacklistCounter") +// } +// } +// } +// return instance +// } +// } + +// object OnlineRecommender{ +// def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = { + +// // 如果已存在CheckPoint,就不进入该方法 +// println("Creating new context") + +// // Streaming处理的结果存放位置 +// val outputFile = new File(outputPath.split(":")(1)) +// if (outputFile.exists()) outputFile.delete() + +// val conf = new SparkConf().setAppName("OnlineRecommender") +// // 默认本地模式运行 +// val isDebug = true +// if (isDebug) { +// conf.setMaster("local[2]") +// } +// // Create the context with a 1 second batch size +// val ssc = new StreamingContext(conf, Seconds(10)) +// // checkpoint存放位置 +// ssc.checkpoint(checkpointDirectory) + +// // 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999 +// val lines = ssc.socketTextStream(ip, port) +// // 将每一行拆分成单词 val words = lines.flatMap(_.split(" ")) +// val words = lines.flatMap(_.split(" ")) +// val wordCounts = words.map((_, 1)).reduceByKey(_ + _) + +// wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => +// // Get or register the blacklist Broadcast +// val blacklist = WordBlacklist.getInstance(rdd.sparkContext) +// // Get or register the droppedWordsCounter Accumulator +// val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) +// // Use blacklist to drop words and use droppedWordsCounter to count them +// /* +// * 累加器进行累加操作,blacklist.value的出现总次数 +// */ +// val counts = rdd.filter { case (word, count) => +// printf("blacklist.value=%s, word=%s, count=%d\n", blacklist.value, word, count) +// if (blacklist.value.contains(word)) { +// droppedWordsCounter.add(count) +// println("return false") +// false +// } else { +// println("return true") +// true +// } +// }.collect().mkString("[", ", ", "]") +// val output = "Counts at time " + time + " " + counts +// println(output) +// println("Dropped " + droppedWordsCounter.value + " word(s) totally") +// println("Appending to " + outputFile.getAbsolutePath) +// Files.append(output + "\n", outputFile, Charset.defaultCharset()) +// } +// return ssc +// } + + +// def main(args: Array[String]): Unit = { + +// val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/" + +// // 设置CheckPoint +// val (ip, port, outputPath, checkpointDir) = ("localhost", 9999, base + "output/out", base + "output/checkpointDir") +// val ssc = StreamingContext.getOrCreate(checkpointDir, () => createContext(ip, port, outputPath, checkpointDir)) + +// ssc.start() // 启动计算 +// ssc.awaitTermination() // 等待计算的终止 +// } +// } \ No newline at end of file diff --git a/src/main/scala/apache/wiki/StreamingWordCount.scala b/src/main/scala/apache/wiki/StreamingWordCount.scala new file mode 100644 index 0000000000000000000000000000000000000000..6423e857fcf54e039afd8a9ad2cdff46965438c9 --- /dev/null +++ b/src/main/scala/apache/wiki/StreamingWordCount.scala @@ -0,0 +1,39 @@ +package apache.wiki + +import org.apache.spark._ +import org.apache.spark.streaming._ + +/** + * @author ${user.name} + * Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills + * + * See LICENSE file for further information. + * + * 参考地址 + * 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html + * ALS说明: http://www.csdn.net/article/2015-05-07/2824641 + */ + +object StreamingWordCount{ + + def main(args: Array[String]): Unit = { + val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount") + val ssc = new StreamingContext(conf, Seconds(1)) + + // 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999 + val lines = ssc.socketTextStream("localhost", 9999) + + // 将每一行拆分成单词 val words = lines.flatMap(_.split(" ")) + val words = lines.flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + val wordCounts = pairs.reduceByKey(_ + _) + + // 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素 + // 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute) + wordCounts.print() + + ssc.start() // 启动计算 + ssc.awaitTermination() // 等待计算的终止 + } +} \ No newline at end of file diff --git a/src/main/scala/apache/wiki/WordCount.scala b/src/main/scala/apache/wiki/WordCount.scala index 64410bf9fc4f4425c439c4552fe735903029fa95..b40fade85d67c4a37346713ce674bcdcb1511b34 100644 --- a/src/main/scala/apache/wiki/WordCount.scala +++ b/src/main/scala/apache/wiki/WordCount.scala @@ -21,14 +21,14 @@ object WordCount { // sys.exit() // } - val lines = sc.textFile("file:/opt/git/RecommendedSystem/README.md") + val lines = sc.textFile("file:/opt/git/RecommenderSystems/README.md") lines.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_+_) .map(x => (x._2, x._1)) .sortByKey(false) .map(x => (x._2, x._1)) - .saveAsTextFile("file:/opt/git/RecommendedSystem/output/result.log") + .saveAsTextFile("file:/opt/git/RecommenderSystems/output/result.log") // println("this system exit ok!!!")