未验证 提交 64b3468b 编写于 作者: M miao18 提交者: GitHub

Merge pull request #3 from Yao544303/master

使用Spark 实现Item CF
*.class
*.log
/spark/target
/*/.idea
/spark/derby.log
/sparl/*.iml
/standalone/mycache
\ No newline at end of file
此差异已折叠。
# Spark 推荐系统
# 项目介绍
项亮的[《推荐系统实践》](https://book.douban.com/subject/10769749/)堪称是推荐系统的经典书籍。
但因其成书时间较早,当时大数据相关技术并未如当下一样流行,故而书中使用的demo 代码,并不具备在大规模数据集上运行的条件。
于是萌生了使用Spark,实现相关内容的念头。
# 目录规划
* data 测试用数据集合
* standalone 相关实践的单机实现版本(主要为python实现)
* spark 相关实践的spark版本(主要为scala实现)
已经完成特征工程,ItemCF 部分
* manual 相关资料集合
# 计划项(恩 就是挖坑的意思)
## 推荐算实现
### 基于用户行为数据的推荐算法
ItemCF
UserCF
关联规则
LFM
Graph
ALS
### 利用用户标签数据推荐算法
LDA
TFIDF
TagCF
## 探索性研究(各个paper的实现)
Markov Chain
社交网络
....
## 评价系统实现
## 推荐系统架构实现
### 外围架构
#### 用户行为日志存储系统
#### 日志系统
#### UI
### 功能模块
#### 数据录入模块
#### 用户特征生成模块
#### 推荐模块
#### 过滤模块
#### 排名模块
## 离线
## 实时
## 总结
## 算法
{
"resources": {
"local_catch_file": "log/cache.txt",
"local_schema_file": "conf/ad_data.avsc"
},
"kafka":{
"topics": "datasys",
"consumer_group": "datasys_kafka2hive",
"brokerHosts": "zk1.common.ad.m.com:9092,zk2.common.ad.m.com:9092,zk3.common.ad.m.com:9092",
"zkHosts": "zk1.common.ad.m.com:2181,zk2.common.ad.m.com:2181,zk3.common.ad.m.com:2181",
"message_num": 400,
"root_dir": "/var/tmp"
},
"hdfs": {
"hdfs_name": "hadoopuser",
"hdfs_port": 50070,
"hdfs_host": "BJSH-ADHBASE-134-128.meitu-inc.com",
"hdfs_path": "/user/hadoopuser/jiangzl"
},
"hive": {
"hive_port": 10000,
"hive_host": "BJSH-ADHBASE-134-128.meitu-inc.com"
}
}
\ No newline at end of file
此差异已折叠。
4 260:5 480:4
2 21:1 95:2 110:5 163:4 165:3 235:3 265:4 292:3 318:5 349:4 356:5 368:4 380:5 434:2 442:3 457:4 459:3 480:5 498:3 515:5 589:4 590:5 593:5 647:3 648:4 736:4 780:3 902:2 920:5 982:4
5 6:2 16:3 24:1 29:5 32:4 34:4 36:3 39:3 41:4 47:3 50:5 52:2 150:2 162:4 176:4 194:3 202:2 215:3 224:3 229:3 265:3 272:3 288:2 296:4 299:3 318:3 321:3 348:4 353:2 356:1 357:2 377:4 412:2 461:3 497:3 501:1 506:4 509:4 515:4 551:4 562:4 581:3 593:4 608:4 714:4 728:4 733:1 800:2 860:2 866:4 896:4 908:4 913:5 919:4 968:3 994:5
3 104:4 260:5 480:4 552:4 590:4 593:3 648:3 653:4 733:5
1 1:5 48:5 150:5 260:4 527:5 531:4 588:4 594:4 595:5 608:4 661:3 720:3 745:3 783:4 914:3 919:4 938:4
1::661::3::978302109
1::914::3::978301968
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
1::938::4::978301752
1::720::3::978300760
1::527::5::978824195
1::48::5::978824351
1::745::3::978824268
1::588::4::978824268
1::783::4::978824291
1::150::5::978301777
1::1::5::978824268
1::260::4::978300760
1::531::4::978302149
1::608::4::978301398
2::647::3::978299351
2::648::4::978299913
2::434::2::978300174
2::292::3::978300123
2::902::2::978298905
2::368::4::978300002
2::110::5::978298625
2::589::4::978299773
2::982::4::978299269
2::515::5::978298542
2::442::3::978300025
2::265::4::978299026
2::480::5::978299809
2::590::5::978299083
2::736::4::978300100
2::593::5::978298517
2::95::2::978300143
2::235::3::978299351
2::163::4::978299809
2::21::1::978299839
2::165::3::978300002
2::380::5::978299809
2::349::4::978299839
2::457::4::978299773
2::920::5::978298775
2::459::3::978300002
2::780::3::978299966
2::498::3::978299418
2::318::5::978298413
2::356::5::978299686
3::648::3::978297867
3::104::4::978298486
3::653::4::978297757
3::260::5::978297512
3::552::4::978297837
3::480::4::978297690
3::733::5::978297757
3::590::4::978297439
3::593::3::978297018
4::260::5::978294199
4::480::4::978294008
5::39::3::978245037
5::288::2::978246585
5::860::2::978244493
5::866::4::978245334
5::215::3::978245422
5::501::1::978244001
5::506::4::978245999
5::509::4::978245829
5::41::4::978244692
5::47::3::978245334
5::296::4::978244177
5::581::3::978244808
5::728::4::978244759
5::299::3::978242934
5::150::2::978245763
5::224::3::978245829
5::229::3::978246528
5::6::2::978245916
5::515::4::978245891
5::800::2::978244540
5::50::5::978244205
5::52::2::978246479
5::733::1::978245763
5::377::4::978245999
5::593::4::978244177
5::162::4::978244624
5::968::3::978242847
5::896::4::978244493
5::318::3::978244177
5::176::4::978244568
5::461::3::978244893
5::608::4::978244177
5::321::3::978245863
5::908::4::978241072
5::16::3::978245645
5::265::3::978245037
5::194::3::978246108
5::551::4::978246504
5::913::5::978242740
5::919::4::978241072
5::412::2::978245891
5::994::5::978244540
5::272::3::978245487
5::24::1::978242934
5::348::4::978245863
5::29::5::978245065
5::562::4::978244603
5::497::3::978245687
5::202::2::978246033
5::353::2::978246504
5::32::4::978244962
5::34::4::978244603
5::356::1::978241112
5::357::2::978245829
5::36::3::978244808
5::714::4::978244493
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>apache.wiki</groupId>
<artifactId>RecommenderSystems</artifactId>
<version>1.0-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2015</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.7</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.0.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.compat.version}</artifactId>
<version>2.4.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-core_${scala.compat.version}</artifactId>
<version>2.4.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
<version>2.2.4</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
/target/*
/.idea/*
/spark-warehouse/*
*.iml
/target/
/.settings/*
*.project
*.classpath
# 特征工程部分
代码位于Features 类
因为现实系统的rating 值,非常稀疏,为了节省存储空间和提升效率,在特征存储结构上,需要进行一些改动。主要思路如下:
## 将rating 转为libsvm的方式存储
用户对物品打分的数据,针对单个用户而言,对不同的物品的打分是很稀疏的。可以用libsvm格式来进行存储。
```
例如: 输入为
1::661::3::978302109
1::914::3::978301968
转化之后结果为
1 661:3 914:3
```
## 将rating 转为<id, features> 格式的DataFrame
id 为String
features 为 SparseVector
# ItemCF
代码位于ItemCF 类
## 相似度计算
实现了两种方式,Jaccard 相似度 和 余弦相似度
## Jaccard
使用BitSet 存储每个用户的对该Item 是否有Ratting 的情况。
## 余弦相似度
使用自带API实现
## 基于Item 相似度 推荐单个物品
选取该物品和其他物品的相似度向量。使用特征向量和相似度向量点乘即可。
## 基于Item 相似度 推荐topK 的物品
挖坑
# UserCF
挖坑
## 根据topN的相似用户推荐
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>apache.wiki</groupId>
<artifactId>RecommenderSystems</artifactId>
<version>1.0-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2015</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<!--
<spark.version>2.1.0</spark.version>
-->
<spark.version>1.6.2</spark.version>
<scala.version>2.10</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.3.0</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<finalName>example-${project.version}</finalName>
<artifactSet>
<excludes>
<exclude>oro*</exclude>
<!--exclude>org.apache.*:*</exclude-->
<exclude>junit:junit</exclude>
<exclude>org.*</exclude>
<exclude>au.*</exclude>
<exclude>codegen.*</exclude>
<exclude>images.*</exclude>
<exclude>javaewah.*</exclude>
<exclude>javassist.*</exclude>
<exclude>javax.*</exclude>
<exclude>javolution.*</exclude>
<exclude>jodd.*</exclude>
<exclude>parquet.*</exclude>
<exclude>repackage.*</exclude>
<exclude>templates.*</exclude>
<exclude>webapps.*</exclude>
<exclude>schemaorg_apache_xmlbeans.*</exclude>
<exclude>com.google.*</exclude>
<exclude>com.facebook.*</exclude>
<exclude>com.m.*</exclude>
<exclude>org.*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>log4j*</exclude>
<exclude>org.antlr*</exclude>
<exclude>org.datanucleus*</exclude>
<exclude>net*</exclude>
<exclude>commons*</exclude>
<exclude>com.j*</exclude>
<exclude>com.x*</exclude>
<exclude>com.n*</exclude>
<exclude>com.i*</exclude>
<exclude>com.t*</exclude>
<exclude>com.c*</exclude>
<exclude>com.gi*</exclude>
<exclude>com.f*</exclude>
<exclude>com.su*</exclude>
<exclude>com.a*</exclude>
<exclude>com.e*</exclude>
<exclude>javax*</exclude>
<exclude>s*</exclude>
<exclude>i*</exclude>
<exclude>j*</exclude>
<exclude>a*</exclude>
<exclude>x*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}.6</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<configuration>
<scalaVersion>${scala.version}.6</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
package com.apachecn.recommand.colfliter
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ych on 2018/9/20.
*/
class Features {
/**
* 将Rating 为 id features 形式的DataFrame
*/
def changeRating2DF(sc: SparkContext,
ratingsPath: String,
size: Int,
OutPath: String
): Unit ={
val sqlContext = new SQLContext(sc)
val ratingsRdd = sc.textFile(ratingsPath)
.map(_.split("::"))
.map(x=>(x(0),Array((x(1).toInt,x(2).toDouble))))
.reduceByKey(_ ++ _)
.map(x=>(x._1,Vectors.sparse(size,x._2)))
val df = sqlContext.createDataFrame(ratingsRdd).toDF("id", "features")
.write.parquet(OutPath)
}
/**
* 将输入的打分值,转为libsvm
* 例如: 输入为
* 1::661::3::978302109
* 1::914::3::978301968
* 转化之后结果为
* 1 661:3 914:3
*/
def changeRatings2LibSVM(sc: SparkContext,
ratingsPath: String,
ratingsLibSVMPath: String): Unit ={
val ratingsRdd = sc.textFile(ratingsPath)
.map(_.split("::"))
.map(x=>(x(0),Array((x(1).toInt,x(2).toInt))))
.reduceByKey(_ ++ _)
.map(x=>(x._1+" " + x._2.sortBy(_._1).map(x=>(f"${x._1}:${x._2}")).mkString(" ")))
.saveAsTextFile(ratingsLibSVMPath)
}
}
object Features{
def main(args: Array[String]): Unit = {
/**
* 如需使用集群模式,修改入参,使用如下命令提交即可
* 提交命令 spark-submit --master yarn-cluster <ratingsPath> <libSVMOutPath> <dfOutPath> <featureSize>
*/
val conf = new SparkConf().setAppName("Features Prepare").setMaster("local[*]")
val sc = new SparkContext(conf)
val ratingsPath = "..//data//ml-1m//ratings"
val libSVMOutPath = "..//data//libSVMPath"
val dfOutPath = "..//data//DFPath"
val featureSize = 3953
val testF = new Features
testF.changeRatings2LibSVM(sc, ratingsPath, libSVMOutPath)
testF.changeRating2DF(sc, ratingsPath, featureSize, dfOutPath)
}
}
package com.apachecn.recommand.colfliter
import breeze.linalg.SparseVector
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{DenseMatrix, SparseMatrix, Vectors}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import scala.collection.BitSet
/**
* Created by ych on 2018/9/20.
* 基于物品的协同过滤
*/
class ItemCF {
def computeJaccardSimWithDF(sc: SparkContext,
featurePath: String
): CoordinateMatrix ={
val sqlContext = new SQLContext(sc)
val rdd = sqlContext.read.parquet(featurePath).select("features")
.rdd.map(x=>x(0).asInstanceOf[org.apache.spark.mllib.linalg.SparseVector])
.map(x=>x.indices)
.zipWithIndex()
.map(x=>{
for (i <- x._1) yield {
(x._2, i)
}
})
.flatMap(x=>x)
.map(x=>(x._1,BitSet(x._2.toString.toInt)))
.reduceByKey(_.union(_))
val entries = rdd.cartesian(rdd).map {
case ((key0, set0), (key1, set1)) => {
val j = (set0 & (set1)).size
val q = set0.union(set1).size
val re = j.toDouble / q
MatrixEntry(key0.toInt, key1.toInt, re)
}
}
val simMat: CoordinateMatrix = new CoordinateMatrix(entries)
simMat
}
def computeJaccardSimWithLibSVM(sc: SparkContext,
featurePath: String): CoordinateMatrix ={
val rdd = sc.textFile(featurePath)
.map(_.split(" ", 2)(1))
.zipWithIndex()
.map(x => (x._2.toInt,x._1.split(" ", -1)))
.map(x=>{
for (i <- x._2) yield {
(i.split("\\:")(0), x._1)
}
}).flatMap(x=>x)
.map(x=>(x._1,BitSet(x._2.toString.toInt)))
.reduceByKey(_.union(_))
val entries = rdd.cartesian(rdd).map {
case((key0,set0),(key1,set1))=>{
val j = (set0 &(set1)).size
val q = set0.union(set1).size
val re = j.toDouble/q
MatrixEntry(key0.toInt,key1.toInt,re)
}
}
val simMat: CoordinateMatrix = new CoordinateMatrix(entries)
simMat
}
def computeCosSimWithDF(sc: SparkContext,
featurePath: String): CoordinateMatrix ={
val sqlContext = new SQLContext(sc)
val rdd = sqlContext.read.parquet(featurePath).select("features")
.rdd.map(x=>x(0).asInstanceOf[org.apache.spark.mllib.linalg.Vector])
val mat = new RowMatrix(rdd)
val simMat = mat.columnSimilarities()
simMat
}
/**
*
* @param sc
* @param featureSize 特征数量
* @param featurePath
*/
def computeCosSimWithLibSVM(sc: SparkContext,
featureSize: Int,
featurePath: String): CoordinateMatrix ={
val rows = sc.textFile(featurePath).map(_.split(" "))
.map(x=>(x.filter(g=>g.contains(":"))))
.map(x=>(x.map(_.split(":")).map(ar => (ar(0).toInt,ar(1).toDouble))))
.map(x=>(Vectors.sparse(featureSize,x)))
val mat = new RowMatrix(rows)
val simMat = mat.columnSimilarities()
simMat
}
/**
* 载入相似度矩阵
* @param sc
* @param simPath
* @param featruesSize
* @return
*/
def loadSimMatrix(sc: SparkContext,
simPath: String,
featruesSize: Int
): SparseMatrix ={
val entries = sc.textFile(simPath)
.map(_.split("\\|", -1))
.map(x=>(x(0).toInt, x(1).toInt, x(2).toDouble))
.collect()
val simMatrix = SparseMatrix.fromCOO(featruesSize, featruesSize, entries)
simMatrix
}
/**
* 将相似矩阵存储为文本文件
* @param sc
* @param savePath
* @param mat
*/
def saveSimMatrix(sc: SparkContext,
savePath: String,
mat: CoordinateMatrix): Unit ={
val sim = mat
sim.entries.map(x=>x.i+"|"+x.j+"|"+x.value).coalesce(1).saveAsTextFile(savePath)
}
/**
* 根据Item 编号,从相似矩阵中获取该Item 的相似向量
* @param simMatrix
* @param itemNum
* @return
*/
def getSimVecFromMatrix(simMatrix: SparseMatrix, itemNum: Int):Array[Double] ={
val arr1 = for (i <- 0 until itemNum) yield {
simMatrix(i, itemNum)
}
val arr2 = for (i <- itemNum until simMatrix.numRows) yield {
simMatrix(itemNum, i)
}
(arr1 ++ arr2).toArray
}
/**
* 基于Item 相似度向量 计算推荐单个物品时的得分,输出结果按得分降序排序
* @param sc
* @param sim
* @param featurePath
* @return
*/
def predictBySimVecWithLibSVM(sc: SparkContext,
sim: Array[Double],
featurePath: String): RDD[(String, Double)] ={
sc.textFile(featurePath).map(_.split(" ")).map(x=>{
val id = x(0)
var score = 0.0
for (i <- 1 until x.length){
val idx = x(i).split(":")(0)
val value = x(i).split(":")(1)
score += value.toDouble * sim(idx.toInt)
}
(id,score)
}).sortBy(_._2,false)
}
/**
* 基于Item 相似度向量 计算推荐单个物品时的得分,输出结果按得分降序排序
* @param sc
* @param sim
* @param featurePath
* @return
*/
def predictBySimVecWithDF(sc: SparkContext,
sim: Array[Double],
featurePath: String): RDD[(String, Double)] ={
val sqlContext = new SQLContext(sc)
sqlContext.read.parquet(featurePath).select("id","features")
.rdd.map(x=>{
val p = x(0).toString
val v = x(1).asInstanceOf[org.apache.spark.mllib.linalg.SparseVector]
val idxs = v.toSparse.indices
val values = v.toSparse.values
var score = 0.0
for (i <- 0 until idxs.length){
score += values(i) * sim(idxs(i))
}
(p,score)
})
.sortBy(_._2,false)
}
}
object ItemCF extends ItemCF{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("utils").setMaster("local[8]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val libsvmFeaturePath = "..//data//libSVMPath"
val dfFeaturePath = "..//data//DFPath"
val simPath = "..//data//SimPath"
val featureSize = 3953
testComputeSim()
testSaveAndLoadSimMatrix()
def testComputeSim(): Unit ={
println("Test Compute Jaccard Sim With LibSVM ")
val sim1 = computeJaccardSimWithLibSVM(sc,libsvmFeaturePath)
sim1.entries.take(3).foreach(println)
println("Test Compute Cossin Sim With LibSVM ")
val sim2 = computeCosSimWithLibSVM(sc,featureSize,libsvmFeaturePath)
sim2.entries.take(3).foreach(println)
println("Test Compute Jaccard Sim With DataFrame ")
val sim3 = computeJaccardSimWithDF(sc,dfFeaturePath)
sim3.entries.take(3).foreach(println)
println("Test Compute Cossin Sim With DataFrame ")
val sim4 = computeCosSimWithDF(sc,dfFeaturePath)
sim4.entries.take(3).foreach(println)
}
def testSaveAndLoadSimMatrix(): Unit ={
val sim = computeCosSimWithLibSVM(sc,featureSize,libsvmFeaturePath)
saveSimMatrix(sc,simPath,sim)
println("Save The SimMatrix")
val simLoad = loadSimMatrix(sc, simPath, featureSize)
println(s"Load The SimMatrix. The Row Num Is ${simLoad.numRows} The Col Num Is ${simLoad.numCols}")
}
def testPredict(): Unit ={
val simMatrix = loadSimMatrix(sc, simPath, featureSize)
println(s"Load The SimMatrix. The Row Num Is ${simMatrix.numRows} The Col Num Is ${simMatrix.numCols}")
val itemNum = 800
val simVec = getSimVecFromMatrix(simMatrix,itemNum)
println("Test Predict By SimVec With LibSVM ")
val score1 = predictBySimVecWithLibSVM(sc, simVec, libsvmFeaturePath)
score1.take(3).foreach(println)
println("Test Predict By SimVec With DataFrame ")
val score2 = predictBySimVecWithDF(sc, simVec, dfFeaturePath)
score2.take(3).foreach(println)
}
}
}
package com.apachecn.recommand.colfliter
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ych on 2018/9/20.
*/
object sparkUtils {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("utils").setMaster("local")
val sc = new SparkContext(conf)
}
}
package apache.wiki
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* @author ${user.name}
*/
object App {
// def foo(x : Array[String]) = x.foldLeft("")((a,b) => a + b)
// def main(args : Array[String]) {
// println( "Hello World!" )
// println("concat arguments = " + foo(args))
// }
def main(args: Array[String]) {
// 初始化 SparkContext对象,通过SparkConf指定配置的内容
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
println("this system exit ok!!!")
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
sc.stop()
}
}
package apache.wiki
import java.io.File
import java.nio.charset.Charset
import com.google.common.io.Files
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.util.{IntParam, LongAccumulator}
/**
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*
* 参考地址
* GitHub: https://github.com/apachecn/RecommenderSystems
* https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
*/
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
return instance
}
}
/**
* Use this singleton to get or register an Accumulator.
*/
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
return instance
}
}
object CheckPointWordCount{
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
// 如果已存在CheckPoint,就不进入该方法
println("Creating new context")
// Streaming处理的结果存放位置
val outputFile = new File(outputPath.split(":")(1))
if (outputFile.exists()) outputFile.delete()
val conf = new SparkConf().setAppName("CheckPointWordCount")
// 默认本地模式运行
val isDebug = true
if (isDebug) {
conf.setMaster("local[2]")
}
// Create the context with a 1 second batch size
val ssc = new StreamingContext(conf, Seconds(10))
// checkpoint存放位置
ssc.checkpoint(checkpointDirectory)
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
val lines = ssc.socketTextStream(ip, port)
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
/*
* 累加器进行累加操作,blacklist.value的出现总次数
*/
val counts = rdd.filter { case (word, count) =>
printf("blacklist.value=%s, word=%s, count=%d\n", blacklist.value, word, count)
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
println("return false")
false
} else {
println("return true")
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
println(output)
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
Files.append(output + "\n", outputFile, Charset.defaultCharset())
}
return ssc
}
def main(args: Array[String]): Unit = {
val base = if (args.length > 0) args(0) else "file:/opt/git/RecommenderSystems/"
// 设置CheckPoint
val (ip, port, outputPath, checkpointDir) = ("localhost", 9999, base + "output/out", base + "output/checkpointDir")
val ssc = StreamingContext.getOrCreate(checkpointDir, () => createContext(ip, port, outputPath, checkpointDir))
ssc.start() // 启动计算
ssc.awaitTermination() // 等待计算的终止
}
}
\ No newline at end of file
package apache.wiki
import org.apache.spark._
import org.apache.spark.streaming._
/**
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*
* 参考地址
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
*/
object StreamingWordCount{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
// 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute)
wordCounts.print()
ssc.start() // 启动计算
ssc.awaitTermination() // 等待计算的终止
}
}
\ No newline at end of file
package apache.wiki
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* @author ${user.name}
*/
object WordCount {
def main(args: Array[String]) {
// 初始化 SparkContext对象,通过SparkConf指定配置的内容
val conf = new SparkConf().setMaster("local").setAppName("My app") //.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
// // 检验输入参数
// if (args.length < 1) {
// println("USAGE:")
// println("spark-submit ... xxx.jar Date_String [Iteration]")
// println("spark-submit ... xxx.jar 20160424 10")
// sys.exit()
// }
val lines = sc.textFile("file:/opt/git/RecommenderSystems/README.md")
lines.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_+_)
.map(x => (x._2, x._1))
.sortByKey(false)
.map(x => (x._2, x._1))
.saveAsTextFile("file:/opt/git/RecommenderSystems/output/result.log")
// println("this system exit ok!!!")
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
sc.stop()
}
}
/*
* Copyright 2001-2009 Artima, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package samples
/*
ScalaTest facilitates different styles of testing by providing traits you can mix
together to get the behavior and syntax you prefer. A few examples are
included here. For more information, visit:
http://www.scalatest.org/
One way to use ScalaTest is to help make JUnit or TestNG tests more
clear and concise. Here's an example:
*/
import scala.collection.mutable.Stack
import org.scalatest.Assertions
import org.junit.Test
class StackSuite extends Assertions {
@Test def stackShouldPopValuesIinLastInFirstOutOrder() {
val stack = new Stack[Int]
stack.push(1)
stack.push(2)
assert(stack.pop() === 2)
assert(stack.pop() === 1)
}
@Test def stackShouldThrowNoSuchElementExceptionIfAnEmptyStackIsPopped() {
val emptyStack = new Stack[String]
intercept[NoSuchElementException] {
emptyStack.pop()
}
}
}
/*
Here's an example of a FunSuite with ShouldMatchers mixed in:
*/
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class ListSuite extends FunSuite with ShouldMatchers {
test("An empty list should be empty") {
List() should be ('empty)
Nil should be ('empty)
}
test("A non-empty list should not be empty") {
List(1, 2, 3) should not be ('empty)
List("fee", "fie", "foe", "fum") should not be ('empty)
}
test("A list's length should equal the number of elements it contains") {
List() should have length (0)
List(1, 2) should have length (2)
List("fee", "fie", "foe", "fum") should have length (4)
}
}
/*
ScalaTest also supports the behavior-driven development style, in which you
combine tests with text that specifies the behavior being tested. Here's
an example whose text output when run looks like:
A Map
- should only contain keys and values that were added to it
- should report its size as the number of key/value pairs it contains
*/
import org.scalatest.FunSpec
import scala.collection.mutable.Stack
class ExampleSpec extends FunSpec {
describe("A Stack") {
it("should pop values in last-in-first-out order") {
val stack = new Stack[Int]
stack.push(1)
stack.push(2)
assert(stack.pop() === 2)
assert(stack.pop() === 1)
}
it("should throw NoSuchElementException if an empty stack is popped") {
val emptyStack = new Stack[Int]
intercept[NoSuchElementException] {
emptyStack.pop()
}
}
}
}
package samples
import org.junit.runner.RunWith
import org.specs2.mutable._
import org.specs2.runner._
/**
* Sample specification.
*
* This specification can be executed with: scala -cp <your classpath=""> ${package}.SpecsTest
* Or using maven: mvn test
*
* For more information on how to write or run specifications, please visit:
* http://etorreborre.github.com/specs2/guide/org.specs2.guide.Runners.html
*
*/
@RunWith(classOf[JUnitRunner])
class MySpecTest extends Specification {
"The 'Hello world' string" should {
"contain 11 characters" in {
"Hello world" must have size(11)
}
"start with 'Hello'" in {
"Hello world" must startWith("Hello")
}
"end with 'world'" in {
"Hello world" must endWith("world")
}
}
}
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 14 09:07:33 2018
@author: ych
E-mail:yao544303963@gmail.com
"""
import random
import sys
import math
from operator import itemgetter
random.seed(0)
class ItemCF(object):
def __init__(self):
self.trainset = {}
self.testset = {}
self.n_sim_movie = 20
self.n_rec_movie = 10
self.movie_sim_mat = {}
self.movie_popular = {}
self.movie_count = 0
print('Similar movie number = %d' % self.n_sim_movie, file=sys.stderr)
print('Recommendend movie number = %d' % self.n_rec_movie, file=sys.stderr)
def generate_dataset(self, filename, pivot=0.7):
trainset_len = 0
testset_len = 0
fp = open(filename, 'r')
for line in fp:
user, movie, rating, _ = line.split('::')
if random.random() < pivot:
self.trainset.setdefault(user, {})
self.trainset[user][movie] = int(rating)
trainset_len += 1
else:
self.testset.setdefault(user, {})
self.testset[user][movie] = int(rating)
testset_len += 1
print('split succ , trainset is %d , testset is %d' % (trainset_len, testset_len), file=sys.stderr)
def calc_movie_sim(self):
for user, movies in self.trainset.items():
for movie in movies:
if movie not in self.movie_popular:
self.movie_popular[movie] = 0
self.movie_popular[movie] += 1
print('count movies number and pipularity succ', file=sys.stderr)
self.movie_count = len(self.movie_popular)
print('total movie number = %d' % self.movie_count, file=sys.stderr)
itemsim_mat = self.movie_sim_mat
print('building co-rated users matrix', file=sys.stderr)
for user, movies in self.trainset.items():
for m1 in movies:
for m2 in movies:
if m1 == m2:
continue
itemsim_mat.setdefault(m1, {})
itemsim_mat[m1].setdefault(m2, 0)
itemsim_mat[m1][m2] += 1
print('build co-rated users matrix succ', file=sys.stderr)
print('calculating movie similarity matrix', file=sys.stderr)
simfactor_count = 0
PRINT_STEP = 2000000
for m1, related_movies in itemsim_mat.items():
for m2, count in related_movies.items():
itemsim_mat[m1][m2] = count / math.sqrt(self.movie_popular[m1] * self.movie_popular[m2])
simfactor_count += 1
if simfactor_count % PRINT_STEP == 0:
print('calcu movie similarity factor(%d)' % simfactor_count, file=sys.stderr)
print('calcu similiarity succ', file=sys.stderr)
def recommend(self, user):
K = self.n_sim_movie
N = self.n_rec_movie
rank = {}
watched_movies = self.trainset[user]
for movie, rating in watched_movies.items():
for related_movie, similarity_factor in sorted(self.movie_sim_mat[movie].items(), key=itemgetter(1),
reverse=True)[0:K]:
if related_movie in watched_movies:
continue
rank.setdefault(related_movie, 0)
rank[related_movie] += similarity_factor * rating
return sorted(rank.items(), key=itemgetter(1), reverse=True)[0:N]
def evaluate(self):
print('evaluation start', file=sys.stderr)
N = self.n_rec_movie
hit = 0
rec_count = 0
test_count = 0
all_rec_movies = set()
popular_sum = 0
for i, user in enumerate(self.trainset):
if i % 500 == 0:
print('recommend for %d users ' % i, file=sys.stderr)
test_movies = self.testset.get(user, {})
rec_movies = self.recommend(user)
for movie, _ in rec_movies:
if movie in test_movies:
hit += 1
all_rec_movies.add(movie)
popular_sum += math.log(1 + self.movie_popular[movie])
rec_count += N
test_count += len(test_movies)
precision = hit / (1.0 * rec_count)
recall = hit / (1.0 * test_count)
coverage = len(all_rec_movies) / (1.0 * self.movie_count)
popularity = popular_sum / (1.0 * rec_count)
print('precision is %.4f\t recall is %.4f \t coverage is %.4f \t popularity is %.4f'
% (precision, recall, coverage, popularity), file=sys.stderr)
if __name__ == '__main__':
ratingfile = "../../data/ratings"
item_cf = ItemCF()
item_cf.generate_dataset(ratingfile)
item_cf.calc_movie_sim()
item_cf.evaluate()
{"duration": 0.02300095558166504, "input_args": {"filename": "'C:/dtworkspace/RecommenderSystems/data/ratingslibsvm'"}}
\ No newline at end of file
# first line: 24
@mem.cache
def get_data(filename):
data = load_svmlight_file(filename)
return data[0], data[1]
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 14 11:11:25 2018
@author: ych
E-mail:yao544303963@gmail.com
"""
from sklearn.externals.joblib import Memory
from sklearn.datasets import load_svmlight_file
from sklearn.metrics import jaccard_similarity_score
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
np.set_printoptions(suppress=True)
mem = Memory("./mycache")
# 输入数据格式为
# User movie1:ratting1 movie2:ratting2
@mem.cache
def get_data(filename):
data = load_svmlight_file(filename)
return data[0], data[1]
# 计算jaccard 相似度
def get_jaccard_similarity(X):
n = X.T.shape[1]
similarity = np.zeros([n, n])
for i in range(n):
v1 = X.T[i].toarray()
for j in range(i + 1, n):
v2 = X.T[j].toarray()
sim = jaccard_similarity_score(v1, v2)
similarity[i][j] = sim
similarity[j][i] = sim
return similarity
# 计算余弦相似度
def get_consine_similarity(X):
similarity = cosine_similarity(X)
return similarity
filename = "../../data/ratingslibsvm"
X, y = get_data(filename)
consine_sim = get_consine_similarity(X)
print(consine_sim)
jaccard_sim = get_jaccard_similarity(X)
print(jaccard_sim)
# -*- coding: utf-8 -*-
"""
Created on Mon Sep 17 10:55:09 2018
@author: ych
E-mail:yao544303963@gmail.com
"""
import sys
import random
import math
import os
from operator import itemgetter
random.seed(0)
class UserCF(object):
''' TopN recommendation - User Based Collaborative Filtering '''
def __init__(self):
self.trainset = {}
self.testset = {}
self.n_sim_user = 20
self.n_rec_movie = 10
self.user_sim_mat = {}
self.movie_popular = {}
self.movie_count = 0
print ('Similar user number = %d' % self.n_sim_user, file=sys.stderr)
print ('recommended movie number = %d' %
self.n_rec_movie, file=sys.stderr)
def generate_dataset(self, filename, pivot=0.7):
''' load rating data and split it to training set and test set '''
trainset_len = 0
testset_len = 0
fp = open(filename, 'r')
for line in fp:
user, movie, rating, _ = line.split('::')
# split the data by pivot
if random.random() < pivot:
self.trainset.setdefault(user, {})
self.trainset[user][movie] = int(rating)
trainset_len += 1
else:
self.testset.setdefault(user, {})
self.testset[user][movie] = int(rating)
testset_len += 1
print ('split training set and test set succ', file=sys.stderr)
print ('train set = %s' % trainset_len, file=sys.stderr)
print ('test set = %s' % testset_len, file=sys.stderr)
def calc_user_sim(self):
''' calculate user similarity matrix '''
# build inverse table for item-users
# key=movieID, value=list of userIDs who have seen this movie
print ('building movie-users inverse table...', file=sys.stderr)
movie2users = dict()
for user, movies in self.trainset.items():
for movie in movies:
# inverse table for item-users
if movie not in movie2users:
movie2users[movie] = set()
movie2users[movie].add(user)
# count item popularity at the same time
if movie not in self.movie_popular:
self.movie_popular[movie] = 0
self.movie_popular[movie] += 1
print ('build movie-users inverse table succ', file=sys.stderr)
# save the total movie number, which will be used in evaluation
self.movie_count = len(movie2users)
print ('total movie number = %d' % self.movie_count, file=sys.stderr)
# count co-rated items between users
usersim_mat = self.user_sim_mat
print ('building user co-rated movies matrix...', file=sys.stderr)
for movie, users in movie2users.items():
for u in users:
for v in users:
if u == v:
continue
usersim_mat.setdefault(u, {})
usersim_mat[u].setdefault(v, 0)
usersim_mat[u][v] += 1
print ('build user co-rated movies matrix succ', file=sys.stderr)
# calculate similarity matrix
print ('calculating user similarity matrix...', file=sys.stderr)
simfactor_count = 0
PRINT_STEP = 2000000
for u, related_users in usersim_mat.items():
for v, count in related_users.items():
usersim_mat[u][v] = count / math.sqrt(
len(self.trainset[u]) * len(self.trainset[v]))
simfactor_count += 1
if simfactor_count % PRINT_STEP == 0:
print ('calculating user similarity factor(%d)' %
simfactor_count, file=sys.stderr)
print ('calculate user similarity matrix(similarity factor) succ',
file=sys.stderr)
print ('Total similarity factor number = %d' %
simfactor_count, file=sys.stderr)
def recommend(self, user):
''' Find K similar users and recommend N movies. '''
K = self.n_sim_user
N = self.n_rec_movie
rank = dict()
watched_movies = self.trainset[user]
for similar_user, similarity_factor in sorted(self.user_sim_mat[user].items(),
key=itemgetter(1), reverse=True)[0:K]:
for movie in self.trainset[similar_user]:
if movie in watched_movies:
continue
# predict the user's "interest" for each movie
rank.setdefault(movie, 0)
rank[movie] += similarity_factor
# return the N best movies
return sorted(rank.items(), key=itemgetter(1), reverse=True)[0:N]
def evaluate(self):
''' print evaluation result: precision, recall, coverage and popularity '''
print ('Evaluation start...', file=sys.stderr)
N = self.n_rec_movie
# varables for precision and recall
hit = 0
rec_count = 0
test_count = 0
# varables for coverage
all_rec_movies = set()
# varables for popularity
popular_sum = 0
for i, user in enumerate(self.trainset):
if i % 500 == 0:
print ('recommended for %d users' % i, file=sys.stderr)
test_movies = self.testset.get(user, {})
rec_movies = self.recommend(user)
for movie, _ in rec_movies:
if movie in test_movies:
hit += 1
all_rec_movies.add(movie)
popular_sum += math.log(1 + self.movie_popular[movie])
rec_count += N
test_count += len(test_movies)
precision = hit / (1.0 * rec_count)
recall = hit / (1.0 * test_count)
coverage = len(all_rec_movies) / (1.0 * self.movie_count)
popularity = popular_sum / (1.0 * rec_count)
print ('precision=%.4f\trecall=%.4f\tcoverage=%.4f\tpopularity=%.4f' %
(precision, recall, coverage, popularity), file=sys.stderr)
if __name__ == '__main__':
ratingfile = "../../data/ratings"
usercf = UserCF()
usercf.generate_dataset(ratingfile)
usercf.calc_user_sim()
usercf.evaluate()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册