提交 8b65e9f7 编写于 作者: Y Yao544303

ItemCF Finish

上级 d31d19f1
...@@ -5,10 +5,11 @@ ...@@ -5,10 +5,11 @@
# 目录规划 # 目录规划
data 测试用数据集合 * data 测试用数据集合
standalone 相关实践的单机实现版本(主要为python实现) * standalone 相关实践的单机实现版本(主要为python实现)
spark 相关实践的spark版本(主要为scala实现) * spark 相关实践的spark版本(主要为scala实现)
manual 相关资料集合 已经完成特征工程,ItemCF 部分
* manual 相关资料集合
# 计划项(恩 就是挖坑的意思) # 计划项(恩 就是挖坑的意思)
## 推荐算实现 ## 推荐算实现
...@@ -30,8 +31,6 @@ Markov Chain ...@@ -30,8 +31,6 @@ Markov Chain
社交网络 社交网络
.... ....
## paper阅读笔记
## 评价系统实现 ## 评价系统实现
4 260:5 480:4
2 21:1 95:2 110:5 163:4 165:3 235:3 265:4 292:3 318:5 349:4 356:5 368:4 380:5 434:2 442:3 457:4 459:3 480:5 498:3 515:5 589:4 590:5 593:5 647:3 648:4 736:4 780:3 902:2 920:5 982:4
4 260:5 480:4
2 21:1 95:2 110:5 163:4 165:3 235:3 265:4 292:3 318:5 349:4 356:5 368:4 380:5 434:2 442:3 457:4 459:3 480:5 498:3 515:5 589:4 590:5 593:5 647:3 648:4 736:4 780:3 902:2 920:5 982:4
5 6:2 16:3 24:1 29:5 32:4 34:4 36:3 39:3 41:4 47:3 50:5 52:2 150:2 162:4 176:4 194:3 202:2 215:3 224:3 229:3 265:3 272:3 288:2 296:4 299:3 318:3 321:3 348:4 353:2 356:1 357:2 377:4 412:2 461:3 497:3 501:1 506:4 509:4 515:4 551:4 562:4 581:3 593:4 608:4 714:4 728:4 733:1 800:2 860:2 866:4 896:4 908:4 913:5 919:4 968:3 994:5 5 6:2 16:3 24:1 29:5 32:4 34:4 36:3 39:3 41:4 47:3 50:5 52:2 150:2 162:4 176:4 194:3 202:2 215:3 224:3 229:3 265:3 272:3 288:2 296:4 299:3 318:3 321:3 348:4 353:2 356:1 357:2 377:4 412:2 461:3 497:3 501:1 506:4 509:4 515:4 551:4 562:4 581:3 593:4 608:4 714:4 728:4 733:1 800:2 860:2 866:4 896:4 908:4 913:5 919:4 968:3 994:5
3 104:4 260:5 480:4 552:4 590:4 593:3 648:3 653:4 733:5 3 104:4 260:5 480:4 552:4 590:4 593:3 648:3 653:4 733:5
1 1:5 48:5 150:5 260:4 527:5 531:4 588:4 594:4 595:5 608:4 661:3 720:3 745:3 783:4 914:3 919:4 938:4 1 1:5 48:5 150:5 260:4 527:5 531:4 588:4 594:4 595:5 608:4 661:3 720:3 745:3 783:4 914:3 919:4 938:4
# 特征工程部分
代码位于Features 类
因为现实系统的rating 值,非常稀疏,为了节省存储空间和提升效率,在特征存储结构上,需要进行一些改动。主要思路如下:
## 将rating 转为libsvm的方式存储
例如: 输入为
1 661:3 914:3
## 将rating 转为<id, features> 格式的DataFrame
id 为String
features 为 SparseVector
# ItemCF
代码位于ItemCF 类
## 相似度计算
实现了两种方式,Jaccard 相似度 和 余弦相似度
## Jaccard
使用BitSet 存储每个用户的对该Item 是否有Ratting 的情况。
## 余弦相似度
## 基于Item 相似度 推荐单个物品
## 基于Item 相似度 推荐topK 的物品
# UserCF
## 根据topN的相似用户推荐
\ No newline at end of file
...@@ -53,14 +53,15 @@ object Features{ ...@@ -53,14 +53,15 @@ object Features{
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
/** /**
* 如需使用集群模式,修改入参,使用如下命令提交即可
* 提交命令 spark-submit --master yarn-cluster <ratingsPath> <libSVMOutPath> <dfOutPath> <featureSize> * 提交命令 spark-submit --master yarn-cluster <ratingsPath> <libSVMOutPath> <dfOutPath> <featureSize>
*/ */
val conf = new SparkConf().setAppName("Features Prepare") val conf = new SparkConf().setAppName("Features Prepare").setMaster("local[*]")
val sc = new SparkContext(conf) val sc = new SparkContext(conf)
val ratingsPath = args(0) val ratingsPath = "..//data//ml-1m//ratings"
val libSVMOutPath = args(1) val libSVMOutPath = "..//data//libSVMPath"
val dfOutPath = args(2) val dfOutPath = "..//data//DFPath"
val featureSize = args(3).toInt val featureSize = 3953
val testF = new Features val testF = new Features
...@@ -69,3 +70,4 @@ object Features{ ...@@ -69,3 +70,4 @@ object Features{
} }
} }
...@@ -21,7 +21,7 @@ class ItemCF { ...@@ -21,7 +21,7 @@ class ItemCF {
val sqlContext = new SQLContext(sc) val sqlContext = new SQLContext(sc)
val rdd = sqlContext.read.parquet(featurePath).select("features") val rdd = sqlContext.read.parquet(featurePath).select("features")
.rdd.map(x=>x(0).asInstanceOf[org.apache.spark.mllib.linalg.SparseVector]) .rdd.map(x=>x(0).asInstanceOf[org.apache.spark.mllib.linalg.SparseVector])
.map(x=>(x.indices)) .map(x=>x.indices)
.zipWithIndex() .zipWithIndex()
.map(x=>{ .map(x=>{
for (i <- x._1) yield { for (i <- x._1) yield {
...@@ -104,6 +104,13 @@ class ItemCF { ...@@ -104,6 +104,13 @@ class ItemCF {
} }
* 载入相似度矩阵
* @param sc
* @param simPath
* @param featruesSize
* @return
def loadSimMatrix(sc: SparkContext, def loadSimMatrix(sc: SparkContext,
simPath: String, simPath: String,
featruesSize: Int featruesSize: Int
...@@ -130,59 +137,144 @@ class ItemCF { ...@@ -130,59 +137,144 @@ class ItemCF {
} }
def predictByMatrix(sc: SparkContext, /**
simMatrix: breeze.linalg.Matrix[Double], * 根据Item 编号,从相似矩阵中获取该Item 的相似向量
featuresSize: Int, * @param simMatrix
featurePath: String, * @param itemNum
resultPath: String * @return
): Unit ={ */
val rdd = sc.textFile(featurePath) def getSimVecFromMatrix(simMatrix: SparseMatrix, itemNum: Int):Array[Double] ={
.map(_.split(" ")) val arr1 = for (i <- 0 until itemNum) yield {
.map(x=>(x.filter(g=>g.contains(":")))) simMatrix(i, itemNum)
.map(x=>(x.map(_.split(":")).map(ar => (ar(0).toInt,ar(1).toDouble)))) }
.map(x=>{ val arr2 = for (i <- itemNum until simMatrix.numRows) yield {
val idx = x.map(_._1) simMatrix(itemNum, i)
val v = x.map(_._2) }
val vec: SparseVector[Double] = new SparseVector(idx, v, featuresSize) (arr1 ++ arr2).toArray
vec }
* 基于Item 相似度向量 计算推荐单个物品时的得分,输出结果按得分降序排序
* @param sc
* @param sim
* @param featurePath
* @return
def predictBySimVecWithLibSVM(sc: SparkContext,
sim: Array[Double],
featurePath: String): RDD[(String, Double)] ={
sc.textFile(featurePath).map(_.split(" ")).map(x=>{
val id = x(0)
var score = 0.0
for (i <- 1 until x.length){
val idx = x(i).split(":")(0)
val value = x(i).split(":")(1)
score += value.toDouble * sim(idx.toInt)
* 基于Item 相似度向量 计算推荐单个物品时的得分,输出结果按得分降序排序
* @param sc
* @param sim
* @param featurePath
* @return
def predictBySimVecWithDF(sc: SparkContext,
sim: Array[Double],
featurePath: String): RDD[(String, Double)] ={
val sqlContext = new SQLContext(sc)
val p = x(0).toString
val v = x(1).asInstanceOf[org.apache.spark.mllib.linalg.SparseVector]
val idxs = v.toSparse.indices
val values = v.toSparse.values
var score = 0.0
for (i <- 0 until idxs.length){
score += values(i) * sim(idxs(i))
}) })
// .map(x=>(x.toDenseVector.toDenseMatrix.dot(simMatrix))) .sortBy(_._2,false)
} }
} }
object ItemCF extends ItemCF{ object ItemCF extends ItemCF{
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("utils").setMaster("local[8]") val conf = new SparkConf().setAppName("utils").setMaster("local[8]")
val sc = new SparkContext(conf) val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) val sqlContext = new SQLContext(sc)
val libsvmFeaturePath = "C:\\workspace\\data\\apacheCN\\libsvmOut" val libsvmFeaturePath = "..//data//libSVMPath"
val dfFeaturePath = "C:\\workspace\\data\\apacheCN\\dfOut" val dfFeaturePath = "..//data//DFPath"
val simPath = "C:\\workspace\\data\\apacheCN\\simPath" val simPath = "..//data//SimPath"
// val JaccardSimPath = "..//data//jaccardSim"
val CosSimPath = "..//data//cosSim"
val featureSize = 3953 val featureSize = 3953
// testComputeSim()
// val sim1 = computeJaccardSimWithLibSVM(sc,libsvmFeaturePath) testSaveAndLoadSimMatrix()
// sim1.entries.take(10)
def testComputeSim(): Unit ={
println("Test Compute Jaccard Sim With LibSVM ")
val sim1 = computeJaccardSimWithLibSVM(sc,libsvmFeaturePath)
val sim2 = computeCosSimWithLibSVM(sc,featureSize,libsvmFeaturePath)
// sim2.entries.take(10).foreach(println)
// saveSimMatrix(sc,simPath,sim2)
println("Test Compute Cossin Sim With LibSVM ")
val sim2 = computeCosSimWithLibSVM(sc,featureSize,libsvmFeaturePath)
println("Test Compute Jaccard Sim With DataFrame ")
val sim3 = computeJaccardSimWithDF(sc,dfFeaturePath) val sim3 = computeJaccardSimWithDF(sc,dfFeaturePath)
sim3.entries.take(10) sim3.entries.take(3).foreach(println)
println("Test Compute Cossin Sim With DataFrame ")
val sim4 = computeCosSimWithDF(sc,dfFeaturePath)
val sim4 = computeCosSimWithDF(sc,dfFeaturePath)
// computeItemJaccardSim(sc,featurePath, JaccardSimPath) def testSaveAndLoadSimMatrix(): Unit ={
// computeItemCosSim(sc,100,featurePath, CosSimPath) val sim = computeCosSimWithLibSVM(sc,featureSize,libsvmFeaturePath)
val simMatrix = loadSimMatrix(sc, simPath, featureSize) saveSimMatrix(sc,simPath,sim)
// val score = predict()
println("Save The SimMatrix")
val simLoad = loadSimMatrix(sc, simPath, featureSize)
println(s"Load The SimMatrix. The Row Num Is ${simLoad.numRows} The Col Num Is ${simLoad.numCols}")
def testPredict(): Unit ={
val simMatrix = loadSimMatrix(sc, simPath, featureSize)
println(s"Load The SimMatrix. The Row Num Is ${simMatrix.numRows} The Col Num Is ${simMatrix.numCols}")
val itemNum = 800
val simVec = getSimVecFromMatrix(simMatrix,itemNum)
println("Test Predict By SimVec With LibSVM ")
val score1 = predictBySimVecWithLibSVM(sc, simVec, libsvmFeaturePath)
println("Test Predict By SimVec With DataFrame ")
val score2 = predictBySimVecWithDF(sc, simVec, dfFeaturePath)
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册