Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenDocCN
RecommenderSystems
提交
d31d19f1
R
RecommenderSystems
项目概览
OpenDocCN
/
RecommenderSystems
通知
22
Star
1
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
RecommenderSystems
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
d31d19f1
编写于
10月 16, 2018
作者:
Y
Yao544303
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add
上级
cd9091a8
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
254 addition
and
438 deletion
+254
-438
README.md
README.md
+28
-19
manual/资料汇总.md
manual/资料汇总.md
+0
-0
spark/src/main/scala/com/apachecn/recommand/als/App.scala
spark/src/main/scala/com/apachecn/recommand/als/App.scala
+0
-27
spark/src/main/scala/com/apachecn/recommand/als/CheckPointWordCount.scala
...cala/com/apachecn/recommand/als/CheckPointWordCount.scala
+0
-128
spark/src/main/scala/com/apachecn/recommand/als/StreamingWordCount.scala
...scala/com/apachecn/recommand/als/StreamingWordCount.scala
+0
-39
spark/src/main/scala/com/apachecn/recommand/als/WordCount.scala
...src/main/scala/com/apachecn/recommand/als/WordCount.scala
+0
-39
spark/src/main/scala/com/apachecn/recommand/colfliter/Features.scala
...ain/scala/com/apachecn/recommand/colfliter/Features.scala
+58
-23
spark/src/main/scala/com/apachecn/recommand/colfliter/ItemCF.scala
.../main/scala/com/apachecn/recommand/colfliter/ItemCF.scala
+153
-13
spark/src/main/scala/com/apachecn/recommand/colfliter/sparkUtils.scala
...n/scala/com/apachecn/recommand/colfliter/sparkUtils.scala
+0
-5
spark/src/test/scala/samples/scalatest.scala
spark/src/test/scala/samples/scalatest.scala
+0
-109
spark/src/test/scala/samples/specs.scala
spark/src/test/scala/samples/specs.scala
+0
-31
standalone/col_filtering/item_cf.py
standalone/col_filtering/item_cf.py
+1
-1
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/5df639254df90ffb4b58eba85a36303c/metadata.json
...n/get_data/5df639254df90ffb4b58eba85a36303c/metadata.json
+1
-0
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/5df639254df90ffb4b58eba85a36303c/output.pkl
...earn/get_data/5df639254df90ffb4b58eba85a36303c/output.pkl
+0
-0
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/9c0e4967ea839d4f938fcd3dc25572d0/metadata.json
...n/get_data/9c0e4967ea839d4f938fcd3dc25572d0/metadata.json
+1
-0
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/9c0e4967ea839d4f938fcd3dc25572d0/output.pkl
...earn/get_data/9c0e4967ea839d4f938fcd3dc25572d0/output.pkl
+0
-0
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/func_code.py
...col_filtering-similarity_by_sklearn/get_data/func_code.py
+5
-0
standalone/col_filtering/similarity_by_sklearn.py
standalone/col_filtering/similarity_by_sklearn.py
+6
-3
standalone/col_filtering/user_cf.py
standalone/col_filtering/user_cf.py
+1
-1
未找到文件。
README.md
浏览文件 @
d31d19f1
...
...
@@ -3,13 +3,16 @@
但因其成书时间较早,当时大数据相关技术并未如当下一样流行,故而书中使用的demo 代码,并不具备在大规模数据集上运行的条件。
于是萌生了使用Spark,实现相关内容的念头。
# 目录规划
data 测试用数据集合
standalone 相关实践的单机实现版本(主要为python实现)
spark 相关实践的spark版本(主要为scala实现)
# 推荐算法实现(计划)
## 基于用户行为数据的
# 目录规划
data 测试用数据集合
standalone 相关实践的单机实现版本(主要为python实现)
spark 相关实践的spark版本(主要为scala实现)
manual 相关资料集合
# 计划项(恩 就是挖坑的意思)
## 推荐算实现
### 基于用户行为数据的推荐算法
ItemCF
UserCF
关联规则
...
...
@@ -17,7 +20,7 @@ LFM
Graph
ALS
##
利用用户标签数据
##
# 利用用户标签数据推荐算法
LDA
TFIDF
TagCF
...
...
@@ -27,18 +30,24 @@ Markov Chain
社交网络
....
# 推荐系统架构实现
## 外围架构
### 用户行为日志存储系统
### 日志系统
### UI
## 功能模块
### 数据录入模块
### 用户特征生成模块
### 推荐模块
### 过滤模块
### 排名模块
## paper阅读笔记
## 评价系统实现
## 推荐系统架构实现
### 外围架构
#### 用户行为日志存储系统
#### 日志系统
#### UI
### 功能模块
#### 数据录入模块
#### 用户特征生成模块
#### 推荐模块
#### 过滤模块
#### 排名模块
...
...
manual/资料汇总.md
0 → 100644
浏览文件 @
d31d19f1
spark/src/main/scala/com/apachecn/recommand/als/App.scala
已删除
100644 → 0
浏览文件 @
cd9091a8
package
apache.wiki
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkContext
/**
* @author ${user.name}
*/
object
App
{
// def foo(x : Array[String]) = x.foldLeft("")((a,b) => a + b)
// def main(args : Array[String]) {
// println( "Hello World!" )
// println("concat arguments = " + foo(args))
// }
def
main
(
args
:
Array
[
String
])
{
// 初始化 SparkContext对象,通过SparkConf指定配置的内容
val
conf
=
new
SparkConf
().
setMaster
(
"local"
).
setAppName
(
"My App"
)
val
sc
=
new
SparkContext
(
conf
)
println
(
"this system exit ok!!!"
)
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
sc
.
stop
()
}
}
spark/src/main/scala/com/apachecn/recommand/als/CheckPointWordCount.scala
已删除
100644 → 0
浏览文件 @
cd9091a8
package
apache.wiki
import
java.io.File
import
java.nio.charset.Charset
import
com.google.common.io.Files
import
org.apache.spark.
{
SparkConf
,
SparkContext
}
import
org.apache.spark.broadcast.Broadcast
import
org.apache.spark.rdd.RDD
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
,
Time
}
import
org.apache.spark.util.
{
IntParam
,
LongAccumulator
}
/**
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*
* 参考地址
* GitHub: https://github.com/apachecn/RecommenderSystems
* https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
*/
object
WordBlacklist
{
@volatile
private
var
instance
:
Broadcast
[
Seq
[
String
]]
=
null
def
getInstance
(
sc
:
SparkContext
)
:
Broadcast
[
Seq
[
String
]]
=
{
if
(
instance
==
null
)
{
synchronized
{
if
(
instance
==
null
)
{
val
wordBlacklist
=
Seq
(
"a"
,
"b"
,
"c"
)
instance
=
sc
.
broadcast
(
wordBlacklist
)
}
}
}
return
instance
}
}
/**
* Use this singleton to get or register an Accumulator.
*/
object
DroppedWordsCounter
{
@volatile
private
var
instance
:
LongAccumulator
=
null
def
getInstance
(
sc
:
SparkContext
)
:
LongAccumulator
=
{
if
(
instance
==
null
)
{
synchronized
{
if
(
instance
==
null
)
{
instance
=
sc
.
longAccumulator
(
"WordsInBlacklistCounter"
)
}
}
}
return
instance
}
}
object
CheckPointWordCount
{
def
createContext
(
ip
:
String
,
port
:
Int
,
outputPath
:
String
,
checkpointDirectory
:
String
)
:
StreamingContext
=
{
// 如果已存在CheckPoint,就不进入该方法
println
(
"Creating new context"
)
// Streaming处理的结果存放位置
val
outputFile
=
new
File
(
outputPath
.
split
(
":"
)(
1
))
if
(
outputFile
.
exists
())
outputFile
.
delete
()
val
conf
=
new
SparkConf
().
setAppName
(
"CheckPointWordCount"
)
// 默认本地模式运行
val
isDebug
=
true
if
(
isDebug
)
{
conf
.
setMaster
(
"local[2]"
)
}
// Create the context with a 1 second batch size
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
10
))
// checkpoint存放位置
ssc
.
checkpoint
(
checkpointDirectory
)
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
val
lines
=
ssc
.
socketTextStream
(
ip
,
port
)
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val
words
=
lines
.
flatMap
(
_
.
split
(
" "
))
val
wordCounts
=
words
.
map
((
_
,
1
)).
reduceByKey
(
_
+
_
)
wordCounts
.
foreachRDD
{
(
rdd
:
RDD
[(
String
,
Int
)],
time
:
Time
)
=>
// Get or register the blacklist Broadcast
val
blacklist
=
WordBlacklist
.
getInstance
(
rdd
.
sparkContext
)
// Get or register the droppedWordsCounter Accumulator
val
droppedWordsCounter
=
DroppedWordsCounter
.
getInstance
(
rdd
.
sparkContext
)
// Use blacklist to drop words and use droppedWordsCounter to count them
/*
* 累加器进行累加操作,blacklist.value的出现总次数
*/
val
counts
=
rdd
.
filter
{
case
(
word
,
count
)
=>
printf
(
"blacklist.value=%s, word=%s, count=%d\n"
,
blacklist
.
value
,
word
,
count
)
if
(
blacklist
.
value
.
contains
(
word
))
{
droppedWordsCounter
.
add
(
count
)
println
(
"return false"
)
false
}
else
{
println
(
"return true"
)
true
}
}.
collect
().
mkString
(
"["
,
", "
,
"]"
)
val
output
=
"Counts at time "
+
time
+
" "
+
counts
println
(
output
)
println
(
"Dropped "
+
droppedWordsCounter
.
value
+
" word(s) totally"
)
println
(
"Appending to "
+
outputFile
.
getAbsolutePath
)
Files
.
append
(
output
+
"\n"
,
outputFile
,
Charset
.
defaultCharset
())
}
return
ssc
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
base
=
if
(
args
.
length
>
0
)
args
(
0
)
else
"file:/opt/git/RecommenderSystems/"
// 设置CheckPoint
val
(
ip
,
port
,
outputPath
,
checkpointDir
)
=
(
"localhost"
,
9999
,
base
+
"output/out"
,
base
+
"output/checkpointDir"
)
val
ssc
=
StreamingContext
.
getOrCreate
(
checkpointDir
,
()
=>
createContext
(
ip
,
port
,
outputPath
,
checkpointDir
))
ssc
.
start
()
// 启动计算
ssc
.
awaitTermination
()
// 等待计算的终止
}
}
\ No newline at end of file
spark/src/main/scala/com/apachecn/recommand/als/StreamingWordCount.scala
已删除
100644 → 0
浏览文件 @
cd9091a8
package
apache.wiki
import
org.apache.spark._
import
org.apache.spark.streaming._
/**
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*
* 参考地址
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
*/
object
StreamingWordCount
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
conf
=
new
SparkConf
().
setMaster
(
"local[2]"
).
setAppName
(
"StreamingWordCount"
)
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
1
))
// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999
val
lines
=
ssc
.
socketTextStream
(
"localhost"
,
9999
)
// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val
words
=
lines
.
flatMap
(
_
.
split
(
" "
))
val
pairs
=
words
.
map
(
word
=>
(
word
,
1
))
val
wordCounts
=
pairs
.
reduceByKey
(
_
+
_
)
// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
// 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute)
wordCounts
.
print
()
ssc
.
start
()
// 启动计算
ssc
.
awaitTermination
()
// 等待计算的终止
}
}
\ No newline at end of file
spark/src/main/scala/com/apachecn/recommand/als/WordCount.scala
已删除
100644 → 0
浏览文件 @
cd9091a8
package
apache.wiki
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkContext
/**
* @author ${user.name}
*/
object
WordCount
{
def
main
(
args
:
Array
[
String
])
{
// 初始化 SparkContext对象,通过SparkConf指定配置的内容
val
conf
=
new
SparkConf
().
setMaster
(
"local"
).
setAppName
(
"My app"
)
//.set("spark.executor.memory", "2g")
val
sc
=
new
SparkContext
(
conf
)
// // 检验输入参数
// if (args.length < 1) {
// println("USAGE:")
// println("spark-submit ... xxx.jar Date_String [Iteration]")
// println("spark-submit ... xxx.jar 20160424 10")
// sys.exit()
// }
val
lines
=
sc
.
textFile
(
"file:/opt/git/RecommenderSystems/README.md"
)
lines
.
flatMap
(
_
.
split
(
" "
))
.
map
((
_
,
1
))
.
reduceByKey
(
_
+
_
)
.
map
(
x
=>
(
x
.
_2
,
x
.
_1
))
.
sortByKey
(
false
)
.
map
(
x
=>
(
x
.
_2
,
x
.
_1
))
.
saveAsTextFile
(
"file:/opt/git/RecommenderSystems/output/result.log"
)
// println("this system exit ok!!!")
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
sc
.
stop
()
}
}
spark/src/main/scala/com/apachecn/recommand/colfliter/Features.scala
浏览文件 @
d31d19f1
package
com.apachecn.recommand.colfliter
import
org.apache.spark.mllib.linalg.Vectors
import
org.apache.spark.sql.SQLContext
import
org.apache.spark.
{
SparkConf
,
SparkContext
}
/**
* Created by ych on 2018/9/20.
*/
object
Features
{
class
Features
{
/**
* 将Rating 为 id features 形式的DataFrame
*/
def
changeRating2DF
(
sc
:
SparkContext
,
ratingsPath
:
String
,
size
:
Int
,
OutPath
:
String
)
:
Unit
={
val
sqlContext
=
new
SQLContext
(
sc
)
val
ratingsRdd
=
sc
.
textFile
(
ratingsPath
)
.
map
(
_
.
split
(
"::"
))
.
map
(
x
=>(
x
(
0
),
Array
((
x
(
1
).
toInt
,
x
(
2
).
toDouble
))))
.
reduceByKey
(
_
++
_
)
.
map
(
x
=>(
x
.
_1
,
Vectors
.
sparse
(
size
,
x
.
_2
)))
val
df
=
sqlContext
.
createDataFrame
(
ratingsRdd
).
toDF
(
"id"
,
"features"
)
.
write
.
parquet
(
OutPath
)
}
/**
* 将输入的打分值,转为libsvm
* 例如: 输入为
* 1::661::3::978302109
* 1::914::3::978301968
* 转化之后结果为
* 1 661:3 914:3
*/
def
changeRatings2LibSVM
(
sc
:
SparkContext
,
ratingsPath
:
String
,
ratingsLibSVMPath
:
String
)
:
Unit
={
val
ratingsRdd
=
sc
.
textFile
(
ratingsPath
)
.
map
(
_
.
split
(
"::"
))
.
map
(
x
=>(
x
(
0
),
Array
((
x
(
1
).
toInt
,
x
(
2
).
toInt
))))
.
reduceByKey
(
_
++
_
)
.
map
(
x
=>(
x
.
_1
+
" "
+
x
.
_2
.
sortBy
(
_
.
_1
).
map
(
x
=>(
f
"${x._1}:${x._2}"
)).
mkString
(
" "
)))
.
saveAsTextFile
(
ratingsLibSVMPath
)
}
}
object
Features
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
conf
=
new
SparkConf
().
setAppName
(
"Features Prepare"
).
setMaster
(
"local"
)
val
sc
=
new
SparkContext
(
conf
)
val
ratingsPath
=
"C:\\dtworkspace\\recommand\\data\\ratings"
val
ratingsLibSVMPath
=
"C:\\dtworkspace\\recommand\\data\\ratingslibsvm"
/**
* 将输入的打分值,转为稀疏矩阵
* 例如: 输入为
* 1::661::3::978302109
* 1::914::3::978301968
* 转化之后结果为
* 1 661:3 914:3
*/
def
changeRatings2LibSVM
()
:
Unit
={
val
ratingsRdd
=
sc
.
textFile
(
ratingsPath
)
.
map
(
_
.
split
(
"::"
))
.
map
(
x
=>(
x
(
0
),
Array
((
x
(
1
).
toInt
,
x
(
2
).
toInt
))))
.
reduceByKey
(
_
++
_
)
.
map
(
x
=>(
x
.
_1
+
" "
+
x
.
_2
.
sortBy
(
_
.
_1
).
map
(
x
=>(
f
"${x._1}:${x._2}"
)).
mkString
(
" "
)))
.
coalesce
(
1
).
saveAsTextFile
(
ratingsLibSVMPath
)
}
* 提交命令 spark-submit --master yarn-cluster <ratingsPath> <libSVMOutPath> <dfOutPath> <featureSize>
*/
val
conf
=
new
SparkConf
().
setAppName
(
"Features Prepare"
)
val
sc
=
new
SparkContext
(
conf
)
val
ratingsPath
=
args
(
0
)
val
libSVMOutPath
=
args
(
1
)
val
dfOutPath
=
args
(
2
)
val
featureSize
=
args
(
3
).
toInt
}
val
testF
=
new
Features
testF
.
changeRatings2LibSVM
(
sc
,
ratingsPath
,
libSVMOutPath
)
testF
.
changeRating2DF
(
sc
,
ratingsPath
,
featureSize
,
dfOutPath
)
}
}
spark/src/main/scala/com/apachecn/recommand/colfliter/ItemCF.scala
浏览文件 @
d31d19f1
package
com.apachecn.recommand.colfliter
import
org.apache.spark.SparkContext
import
breeze.linalg.SparseVector
import
org.apache.spark.
{
SparkConf
,
SparkContext
}
import
org.apache.spark.mllib.linalg.
{
DenseMatrix
,
SparseMatrix
,
Vectors
}
import
org.apache.spark.mllib.linalg.distributed.
{
CoordinateMatrix
,
MatrixEntry
,
RowMatrix
}
import
org.apache.spark.rdd.RDD
import
org.apache.spark.sql.SQLContext
import
scala.collection.BitSet
...
...
@@ -10,12 +14,40 @@ import scala.collection.BitSet
* 基于物品的协同过滤
*/
class
ItemCF
{
/**
* 使用BitSet 计算jaccard 距离
*/
def
computeJaccardSim
(
sc
:
SparkContext
,
pathIn
:
String
)
:
RDD
[(
String
,
Double
)]
={
val
rdd
=
sc
.
textFile
(
pathIn
)
def
computeJaccardSimWithDF
(
sc
:
SparkContext
,
featurePath
:
String
)
:
CoordinateMatrix
={
val
sqlContext
=
new
SQLContext
(
sc
)
val
rdd
=
sqlContext
.
read
.
parquet
(
featurePath
).
select
(
"features"
)
.
rdd
.
map
(
x
=>
x
(
0
).
asInstanceOf
[
org.apache.spark.mllib.linalg.SparseVector
])
.
map
(
x
=>(
x
.
indices
))
.
zipWithIndex
()
.
map
(
x
=>{
for
(
i
<-
x
.
_1
)
yield
{
(
x
.
_2
,
i
)
}
})
.
flatMap
(
x
=>
x
)
.
map
(
x
=>(
x
.
_1
,
BitSet
(
x
.
_2
.
toString
.
toInt
)))
.
reduceByKey
(
_
.
union
(
_
))
val
entries
=
rdd
.
cartesian
(
rdd
).
map
{
case
((
key0
,
set0
),
(
key1
,
set1
))
=>
{
val
j
=
(
set0
&
(
set1
)).
size
val
q
=
set0
.
union
(
set1
).
size
val
re
=
j
.
toDouble
/
q
MatrixEntry
(
key0
.
toInt
,
key1
.
toInt
,
re
)
}
}
val
simMat
:
CoordinateMatrix
=
new
CoordinateMatrix
(
entries
)
simMat
}
def
computeJaccardSimWithLibSVM
(
sc
:
SparkContext
,
featurePath
:
String
)
:
CoordinateMatrix
={
val
rdd
=
sc
.
textFile
(
featurePath
)
.
map
(
_
.
split
(
" "
,
2
)(
1
))
.
zipWithIndex
()
.
map
(
x
=>
(
x
.
_2
.
toInt
,
x
.
_1
.
split
(
" "
,
-
1
)))
...
...
@@ -24,25 +56,133 @@ class ItemCF {
(
i
.
split
(
"\\:"
)(
0
),
x
.
_1
)
}
}).
flatMap
(
x
=>
x
)
.
map
(
x
=>(
x
.
_1
,
BitSet
(
x
.
_2
.
toString
.
toInt
))).
reduceByKey
(
_
.
union
(
_
))
.
map
(
x
=>(
x
.
_1
,
BitSet
(
x
.
_2
.
toString
.
toInt
)))
.
reduceByKey
(
_
.
union
(
_
))
val
re
=
rdd
.
cartesian
(
rdd
).
map
{
val
entries
=
rdd
.
cartesian
(
rdd
).
map
{
case
((
key0
,
set0
),(
key1
,
set1
))=>{
val
key
=
key0
+
"|"
+
key1
val
j
=
(
set0
&(
set1
)).
size
val
q
=
set0
.
union
(
set1
).
size
val
re
=
j
.
toDouble
/
q
(
key
,
re
)
MatrixEntry
(
key0
.
toInt
,
key1
.
toInt
,
re
)
}
}
re
val
simMat
:
CoordinateMatrix
=
new
CoordinateMatrix
(
entries
)
simMat
}
def
computeCosSimWithDF
(
sc
:
SparkContext
,
featurePath
:
String
)
:
CoordinateMatrix
={
val
sqlContext
=
new
SQLContext
(
sc
)
val
rdd
=
sqlContext
.
read
.
parquet
(
featurePath
).
select
(
"features"
)
.
rdd
.
map
(
x
=>
x
(
0
).
asInstanceOf
[
org.apache.spark.mllib.linalg.Vector
])
val
mat
=
new
RowMatrix
(
rdd
)
val
simMat
=
mat
.
columnSimilarities
()
simMat
}
/**
*
* @param sc
* @param featureSize 特征数量
* @param featurePath
*/
def
computeCosSimWithLibSVM
(
sc
:
SparkContext
,
featureSize
:
Int
,
featurePath
:
String
)
:
CoordinateMatrix
={
val
rows
=
sc
.
textFile
(
featurePath
).
map
(
_
.
split
(
" "
))
.
map
(
x
=>(
x
.
filter
(
g
=>
g
.
contains
(
":"
))))
.
map
(
x
=>(
x
.
map
(
_
.
split
(
":"
)).
map
(
ar
=>
(
ar
(
0
).
toInt
,
ar
(
1
).
toDouble
))))
.
map
(
x
=>(
Vectors
.
sparse
(
featureSize
,
x
)))
val
mat
=
new
RowMatrix
(
rows
)
val
simMat
=
mat
.
columnSimilarities
()
simMat
}
def
loadSimMatrix
(
sc
:
SparkContext
,
simPath
:
String
,
featruesSize
:
Int
)
:
SparseMatrix
={
val
entries
=
sc
.
textFile
(
simPath
)
.
map
(
_
.
split
(
"\\|"
,
-
1
))
.
map
(
x
=>(
x
(
0
).
toInt
,
x
(
1
).
toInt
,
x
(
2
).
toDouble
))
.
collect
()
val
simMatrix
=
SparseMatrix
.
fromCOO
(
featruesSize
,
featruesSize
,
entries
)
simMatrix
}
/**
* 将相似矩阵存储为文本文件
* @param sc
* @param savePath
* @param mat
*/
def
saveSimMatrix
(
sc
:
SparkContext
,
savePath
:
String
,
mat
:
CoordinateMatrix
)
:
Unit
={
val
sim
=
mat
sim
.
entries
.
map
(
x
=>
x
.
i
+
"|"
+
x
.
j
+
"|"
+
x
.
value
).
coalesce
(
1
).
saveAsTextFile
(
savePath
)
}
def
predictByMatrix
(
sc
:
SparkContext
,
simMatrix
:
breeze.linalg.Matrix
[
Double
],
featuresSize
:
Int
,
featurePath
:
String
,
resultPath
:
String
)
:
Unit
={
val
rdd
=
sc
.
textFile
(
featurePath
)
.
map
(
_
.
split
(
" "
))
.
map
(
x
=>(
x
.
filter
(
g
=>
g
.
contains
(
":"
))))
.
map
(
x
=>(
x
.
map
(
_
.
split
(
":"
)).
map
(
ar
=>
(
ar
(
0
).
toInt
,
ar
(
1
).
toDouble
))))
.
map
(
x
=>{
val
idx
=
x
.
map
(
_
.
_1
)
val
v
=
x
.
map
(
_
.
_2
)
val
vec
:
SparseVector
[
Double
]
=
new
SparseVector
(
idx
,
v
,
featuresSize
)
vec
})
// .map(x=>(x.toDenseVector.toDenseMatrix.dot(simMatrix)))
}
}
object
ItemCF
{
object
ItemCF
extends
ItemCF
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
conf
=
new
SparkConf
().
setAppName
(
"utils"
).
setMaster
(
"local[8]"
)
val
sc
=
new
SparkContext
(
conf
)
val
sqlContext
=
new
SQLContext
(
sc
)
val
libsvmFeaturePath
=
"C:\\workspace\\data\\apacheCN\\libsvmOut"
val
dfFeaturePath
=
"C:\\workspace\\data\\apacheCN\\dfOut"
val
simPath
=
"C:\\workspace\\data\\apacheCN\\simPath"
// val JaccardSimPath = "..//data//jaccardSim"
val
CosSimPath
=
"..//data//cosSim"
val
featureSize
=
3953
//
// val sim1 = computeJaccardSimWithLibSVM(sc,libsvmFeaturePath)
// sim1.entries.take(10)
val
sim2
=
computeCosSimWithLibSVM
(
sc
,
featureSize
,
libsvmFeaturePath
)
// sim2.entries.take(10).foreach(println)
// saveSimMatrix(sc,simPath,sim2)
val
sim3
=
computeJaccardSimWithDF
(
sc
,
dfFeaturePath
)
sim3
.
entries
.
take
(
10
)
val
sim4
=
computeCosSimWithDF
(
sc
,
dfFeaturePath
)
sim4
.
entries
.
take
(
10
)
// computeItemJaccardSim(sc,featurePath, JaccardSimPath)
// computeItemCosSim(sc,100,featurePath, CosSimPath)
val
simMatrix
=
loadSimMatrix
(
sc
,
simPath
,
featureSize
)
// val score = predict()
}
}
spark/src/main/scala/com/apachecn/recommand/colfliter/sparkUtils.scala
浏览文件 @
d31d19f1
...
...
@@ -10,11 +10,6 @@ object sparkUtils {
val
conf
=
new
SparkConf
().
setAppName
(
"utils"
).
setMaster
(
"local"
)
val
sc
=
new
SparkContext
(
conf
)
def
selectData
()
:
Unit
={
val
in
=
"C:\\dtworkspace\\recommand\\data\\ratings"
val
rdd
=
sc
.
textFile
(
in
).
map
(
x
=>(
x
,
x
.
split
(
"::"
)(
1
).
toInt
)).
filter
(
x
=>(
x
.
_2
<
1000
)).
map
(
_
.
_1
).
coalesce
(
1
).
saveAsTextFile
(
in
+
"out"
)
}
}
}
spark/src/test/scala/samples/scalatest.scala
已删除
100644 → 0
浏览文件 @
cd9091a8
/*
* Copyright 2001-2009 Artima, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
samples
/*
ScalaTest facilitates different styles of testing by providing traits you can mix
together to get the behavior and syntax you prefer. A few examples are
included here. For more information, visit:
http://www.scalatest.org/
One way to use ScalaTest is to help make JUnit or TestNG tests more
clear and concise. Here's an example:
*/
import
scala.collection.mutable.Stack
import
org.scalatest.Assertions
import
org.junit.Test
class
StackSuite
extends
Assertions
{
@Test
def
stackShouldPopValuesIinLastInFirstOutOrder
()
{
val
stack
=
new
Stack
[
Int
]
stack
.
push
(
1
)
stack
.
push
(
2
)
assert
(
stack
.
pop
()
===
2
)
assert
(
stack
.
pop
()
===
1
)
}
@Test
def
stackShouldThrowNoSuchElementExceptionIfAnEmptyStackIsPopped
()
{
val
emptyStack
=
new
Stack
[
String
]
intercept
[
NoSuchElementException
]
{
emptyStack
.
pop
()
}
}
}
/*
Here's an example of a FunSuite with ShouldMatchers mixed in:
*/
import
org.scalatest.FunSuite
import
org.scalatest.matchers.ShouldMatchers
import
org.junit.runner.RunWith
import
org.scalatest.junit.JUnitRunner
@RunWith
(
classOf
[
JUnitRunner
])
class
ListSuite
extends
FunSuite
with
ShouldMatchers
{
test
(
"An empty list should be empty"
)
{
List
()
should
be
(
'empty
)
Nil
should
be
(
'empty
)
}
test
(
"A non-empty list should not be empty"
)
{
List
(
1
,
2
,
3
)
should
not
be
(
'empty
)
List
(
"fee"
,
"fie"
,
"foe"
,
"fum"
)
should
not
be
(
'empty
)
}
test
(
"A list's length should equal the number of elements it contains"
)
{
List
()
should
have
length
(
0
)
List
(
1
,
2
)
should
have
length
(
2
)
List
(
"fee"
,
"fie"
,
"foe"
,
"fum"
)
should
have
length
(
4
)
}
}
/*
ScalaTest also supports the behavior-driven development style, in which you
combine tests with text that specifies the behavior being tested. Here's
an example whose text output when run looks like:
A Map
- should only contain keys and values that were added to it
- should report its size as the number of key/value pairs it contains
*/
import
org.scalatest.FunSpec
import
scala.collection.mutable.Stack
class
ExampleSpec
extends
FunSpec
{
describe
(
"A Stack"
)
{
it
(
"should pop values in last-in-first-out order"
)
{
val
stack
=
new
Stack
[
Int
]
stack
.
push
(
1
)
stack
.
push
(
2
)
assert
(
stack
.
pop
()
===
2
)
assert
(
stack
.
pop
()
===
1
)
}
it
(
"should throw NoSuchElementException if an empty stack is popped"
)
{
val
emptyStack
=
new
Stack
[
Int
]
intercept
[
NoSuchElementException
]
{
emptyStack
.
pop
()
}
}
}
}
spark/src/test/scala/samples/specs.scala
已删除
100644 → 0
浏览文件 @
cd9091a8
package
samples
import
org.junit.runner.RunWith
import
org.specs2.mutable._
import
org.specs2.runner._
/**
* Sample specification.
*
* This specification can be executed with: scala -cp <your classpath=""> ${package}.SpecsTest
* Or using maven: mvn test
*
* For more information on how to write or run specifications, please visit:
* http://etorreborre.github.com/specs2/guide/org.specs2.guide.Runners.html
*
*/
@RunWith
(
classOf
[
JUnitRunner
])
class
MySpecTest
extends
Specification
{
"The 'Hello world' string"
should
{
"contain 11 characters"
in
{
"Hello world"
must
have
size
(
11
)
}
"start with 'Hello'"
in
{
"Hello world"
must
startWith
(
"Hello"
)
}
"end with 'world'"
in
{
"Hello world"
must
endWith
(
"world"
)
}
}
}
standalone/col_filtering/item_cf.py
浏览文件 @
d31d19f1
...
...
@@ -134,7 +134,7 @@ class ItemCF(object):
if
__name__
==
'__main__'
:
ratingfile
=
"
C://workspace//data//ml-1m//ml-1m//ratings.dat
"
ratingfile
=
"
../../data/ratings
"
item_cf
=
ItemCF
()
item_cf
.
generate_dataset
(
ratingfile
)
item_cf
.
calc_movie_sim
()
...
...
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/5df639254df90ffb4b58eba85a36303c/metadata.json
0 → 100644
浏览文件 @
d31d19f1
{
"duration"
:
0.02300095558166504
,
"input_args"
:
{
"filename"
:
"'C:/dtworkspace/RecommenderSystems/data/ratingslibsvm'"
}}
\ No newline at end of file
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/5df639254df90ffb4b58eba85a36303c/output.pkl
0 → 100644
浏览文件 @
d31d19f1
文件已添加
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/9c0e4967ea839d4f938fcd3dc25572d0/metadata.json
0 → 100644
浏览文件 @
d31d19f1
{
"duration"
:
0.03000164031982422
,
"input_args"
:
{
"filename"
:
"'../../data/ratingslibsvm'"
}}
\ No newline at end of file
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/9c0e4967ea839d4f938fcd3dc25572d0/output.pkl
0 → 100644
浏览文件 @
d31d19f1
文件已添加
standalone/col_filtering/mycache/joblib/__main__-C%3A-dtworkspace-RecommenderSystems-standalone-col_filtering-similarity_by_sklearn/get_data/func_code.py
0 → 100644
浏览文件 @
d31d19f1
# first line: 24
@
mem
.
cache
def
get_data
(
filename
):
data
=
load_svmlight_file
(
filename
)
return
data
[
0
],
data
[
1
]
standalone/col_filtering/similarity_by_sklearn.py
浏览文件 @
d31d19f1
...
...
@@ -11,7 +11,6 @@ from sklearn.externals.joblib import Memory
from
sklearn.datasets
import
load_svmlight_file
from
sklearn.metrics
import
jaccard_similarity_score
from
sklearn.metrics.pairwise
import
cosine_similarity
from
sklearn.metrics.pairwise
import
pairwise_distances
import
numpy
as
np
np
.
set_printoptions
(
suppress
=
True
)
...
...
@@ -29,7 +28,7 @@ def get_data(filename):
# 计算jaccard 相似度
def
get_jaccard_similarity
(
X
):
n
=
X
.
shape
[
1
]
n
=
X
.
T
.
shape
[
1
]
similarity
=
np
.
zeros
([
n
,
n
])
for
i
in
range
(
n
):
v1
=
X
.
T
[
i
].
toarray
()
...
...
@@ -47,8 +46,12 @@ def get_consine_similarity(X):
return
similarity
filename
=
"
C:/dtworkspace/recommand
/data/ratingslibsvm"
filename
=
"
../..
/data/ratingslibsvm"
X
,
y
=
get_data
(
filename
)
consine_sim
=
get_consine_similarity
(
X
)
print
(
consine_sim
)
jaccard_sim
=
get_jaccard_similarity
(
X
)
print
(
jaccard_sim
)
...
...
standalone/col_filtering/user_cf.py
浏览文件 @
d31d19f1
...
...
@@ -170,7 +170,7 @@ class UserCF(object):
if
__name__
==
'__main__'
:
ratingfile
=
"
C://workspace//data//ml-1m//ml-1m//ratings.dat
"
ratingfile
=
"
../../data/ratings
"
usercf
=
UserCF
()
usercf
.
generate_dataset
(
ratingfile
)
usercf
.
calc_user_sim
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录