spark.mllib 提供了一系列的机器学习算法可以用来从数据中挖掘信息和进行预测。当这些算法被用来建立机器学习模型时,我们同样需要根据实际应用和需求去基于某些标准评估模型的效果。spark.mllib 同样提供了一整套指标用来满足评估机器学习模型效果的要求。
一些机器学习算法可被归类于更广泛的机器学习应用类型,比如分类问题、回归问题、聚类问题等。这里面每一种类型都有成熟的效果评估指标并且这些指标可以从 spark.mllib 中使用,在本章中将会仔细讲述。
虽然分类算法的种类有很多,但是分类模型的评估方法的原理都比较相似。在一个有监督分类问题中,对于每一个数据点都会存在一个真实的分类值(标签)和一个模型生成的预测分类值。基于这两个值,每个数据点的结果都可以被归纳为以下四个类别中的一种:
上面四种值是大多数分类器评估指标的基础。如果仅仅依靠最基本的准确率(预测是对的还是错的)去评估一个分类器的效果的话,这往往不是一个好的指标,原因在于一个数据集可能各个类别的分布非常不平衡。举例来说,如果一个预测欺诈的模型是根据这样的数据集设计出来的:95%的数据点是非欺诈数据,5%的数据点是欺诈数据,那么一个简单的分类器去预测非欺诈时,不考虑输入的影响,它的准确率将会是95%。因此,我们经常会使用 precision 和 recall 这样的指标,因为他们会将“错误”的类型考虑进去。在大多数应用中,precision 和 recall 会被合并为一个指标来在他们之间进行一些平衡,我们称这个指标为 F-measure。
二项分类器可以用来区分数据集中的元素到两个可能的组中(如欺诈和非欺诈),而且它是多项分类器的一种特殊情况。多数的二项分类评估指标可以被推广使用到多项分类评估中。
很多分类模型对于每一个类实际上会输出一个“分数”(经常是一个概率值),其中较高的分数表示更高的可能性,理解这一点非常重要。在二项分类场景中,模型会为每个类别输出一个概率:
和 。并不是所有场景都会取更高概率的类别,有些场景下模型可能需要进行调整来使得它只有在某个概率非常高的情况下才去预测这个类别(如只有在模型预测的欺诈概率高于90%的情况下才屏蔽这笔信用卡交易)。所以,根据模型输出的概率来确定预测的类别是由预测的阈值来决定的。调整预测阈值将会改变模型的 precision 和 recall,而且它是模型调优的重要环节。为了能够将 precision、recall 和其他评估指标根据阈值的改变是如何变化的这一情况可视化出来,一种常见的做法是通过阈值的参数化来绘制相互作对比的指标。一种图叫做 P-R 曲线,它会将 (precision, recall) 组成的点根据不同的阈值绘制出来,而另一种叫做 receiver operating characteristic 或者 ROC 曲线,它会绘制出 (recall, false positive rate) 组成的点。
可用的评估指标
指标 | 定义 |
---|---|
Precision(精确率) | |
Recall(召回率) | |
F-measure | |
Receiver Operating Characteristic(ROC) | |
Area Under ROC Curve(ROC 曲线下面积 | |
Area Under Precision-Recall Curve(P-R 曲线下面积) |
举例
下面代码片段阐释了如何加载一个数据集样本,并对数据训练一个二项分类模型,最后通过几个二项评估指标来评价该模型的效果。
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils // 以 LIBSVM 格式加载训练数据 val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_binary_classification_data.txt") // 分割数据为训练集(60%)和测试集(40%) val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L) training.cache() // 运行训练算法来建模 val model = new LogisticRegressionWithLBFGS() .setNumClasses(2) .run(training) // 清空预测阈值来使模型返回概率值 model.clearThreshold // 使用测试集计算原始分数 val predictionAndLabels = test.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) } // 实例化指标对象 val metrics = new BinaryClassificationMetrics(predictionAndLabels) // 按阈值分别求 precision val precision = metrics.precisionByThreshold precision.foreach { case (t, p) => println(s"Threshold: $t, Precision: $p") } // 按阈值分别求 recall val recall = metrics.recallByThreshold recall.foreach { case (t, r) => println(s"Threshold: $t, Recall: $r") } // Precision-Recall 曲线 val PRC = metrics.pr // F-measure val f1Score = metrics.fMeasureByThreshold f1Score.foreach { case (t, f) => println(s"Threshold: $t, F-score: $f, Beta = 1") } val beta = 0.5 val fScore = metrics.fMeasureByThreshold(beta) f1Score.foreach { case (t, f) => println(s"Threshold: $t, F-score: $f, Beta = 0.5") } // AUPRC val auPRC = metrics.areaUnderPR println("Area under precision-recall curve = " + auPRC) // 计算 ROC 和 PR 曲线中使用的阈值 val thresholds = precision.map(_._1) // ROC 曲线 val roc = metrics.roc // AUROC val auROC = metrics.areaUnderROC println("Area under ROC = " + auROC)
请参考 LogisticRegressionWithLBFGS Scala docs 和 BinaryClassificationMetrics Scala docs 以获取更多 API 相关细节
在Spark开源项目的“examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassificationMetricsExample.scala”路径下可以找到完整的例子代码。
多项分类描述了这样一种分类问题,对于每个数据点都有
种可能的分类标签(当 时为二项分类问题)。举例来说,区分手写样本为0到9的数字就是一种有10个可能分类的多项分类问题。对于多类别评估指标,积极和消极的概念跟二项分类略有不同。预测值和实际分类仍然可以是积极的或消极的,但是它们必须被归类到某种特定的类别中。每一个类别标签和预测值都会分配给多个类别中的一个,所以它对于分配的类别来说是积极的,但对于其他类别则是消极的。因此,true positive 发生在预测值和实际分类相吻合的时候,而 true negative 发生在预测值和实际分类都不属于一个类别的时候。由于这个认识,对于一个数据样本而言存在多个 true negative。根据之前对于 positive 和 negative 的定义扩展到 false negative 和 false positive 时就相对直接了。
相对于只有两种可能分类的二项分类问题,多项分类问题有多种可能的类别,所以基于类别的评估指标概念则被引入。基于所有类别的 precision 的准确率计算方式为,任何类别被预测正确的次数(true positive)除以数据点的数量。基于一个类别的 precision 的计算方式为,某个特定的类别被预测正确的次数除以这个类别在结果中出现的次数。
可用的评估指标
定义类别或标签序列为:
真实的输出向量
包含 个元素:
对于
个元素,一个多类别预测算法会产生一个预测向量 :
一个修正的 delta 函数
将会证明是有用的:
指标 | 定义 |
---|---|
Confusion Matrix(混淆矩阵) | |
Accuracy(准确率) | |
Precision by label(标签 precision) | |
Recall by label(标签 recall) | |
F-measure by label(标签 F-measure) | |
Weighted precision(加权 precision) | |
Weighted recall(加权 recall) | |
Weighted F-measure(加权 F-measure) |
举例
下面代码片段阐释了如何加载一个数据集样本,并对数据训练一个多项分类模型,最后通过多项分类评估指标评估模型效果。
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils // 以 LIBSVM 格式加载训练数据 val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt") // 分割数据为训练集(60%)和测试集(40%) val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L) training.cache() // 运行训练算法来建模 val model = new LogisticRegressionWithLBFGS() .setNumClasses(3) .run(training) // 使用测试集计算原始分数 val predictionAndLabels = test.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) } // 实例化指标对象 val metrics = new MulticlassMetrics(predictionAndLabels) // 混淆矩阵 println("Confusion matrix:") println(metrics.confusionMatrix) // 总体统计结果 val accuracy = metrics.accuracy println("Summary Statistics") println(s"Accuracy = $accuracy") // 基于类别计算 Precision val labels = metrics.labels labels.foreach { l => println(s"Precision($l) = " + metrics.precision(l)) } // 基于类别计算 Recall labels.foreach { l => println(s"Recall($l) = " + metrics.recall(l)) } // 基于类别计算 False positive rate labels.foreach { l => println(s"FPR($l) = " + metrics.falsePositiveRate(l)) } // 基于类别计算 F-measure labels.foreach { l => println(s"F1-Score($l) = " + metrics.fMeasure(l)) } // 加权统计结果 println(s"Weighted precision: ${metrics.weightedPrecision}") println(s"Weighted recall: ${metrics.weightedRecall}") println(s"Weighted F1 score: ${metrics.weightedFMeasure}") println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
参考 MulticlassMetrics Scala docs 来获取更多 API 相关细节
在Spark repo中的“examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassificationMetricsExample.scala”中查找完整示例代码。
多标签分类问题指的是将一个数据集中的每一个样本映射到一组分类标签中去。在这一类分类问题中,各个标签所包含的样本并不是互斥的。举例来说,将一组新闻稿分类到不同的主题中去,一篇稿子可能既属于科技类,又属于政治类。
由于标签不是彼此互斥,预测值和真实的分类标签就成为了标签集合的向量,而非标签的向量。于是多标签评估指标就可以从基本的 precision、recall 等概念扩展到对集合的操作上。例如,对于某个类的 true positive 就发生在当这个类别存在于某个数据点的预测值集合中,并且也存在于它的真实类别集合中时。
可用指标
我们定义一个集合
包含 个文档
定义
为全体标签集合, 为全体预测结果集合,L_i 和 P_i 分别为文档 d_i 的标签集合和预测值集合。那么所有独立标签的集合则表示为
对于集合 A 的标识函数将会被使用
指标 | 定义 |
---|---|
Precision | |
Recall | |
Accuracy | |
Precision by label | |
Recall by label | |
F1-measure by label | |
Hamming Loss | |
Subset Accuracy | |
F1 Measure | |
Micro precision | |
Micro recall | |
Micro F1 Measure |
举例
以下代码片段阐释了如何去评估一个多标签分类器的效果。下面的多标签分类例子中使用了伪造的预测数据和标签数据。
对于每一篇文章的预测结果为:
对于每一类别的预测结果为:
每一类别实际包含的文章为:
import org.apache.spark.mllib.evaluation.MultilabelMetrics import org.apache.spark.rdd.RDD val scoreAndLabels: RDD[(Array[Double], Array[Double])] = sc.parallelize( Seq((Array(0.0, 1.0), Array(0.0, 2.0)), (Array(0.0, 2.0), Array(0.0, 1.0)), (Array.empty[Double], Array(0.0)), (Array(2.0), Array(2.0)), (Array(2.0, 0.0), Array(2.0, 0.0)), (Array(0.0, 1.0, 2.0), Array(0.0, 1.0)), (Array(1.0), Array(1.0, 2.0))), 2) // 实例化指标对象 val metrics = new MultilabelMetrics(scoreAndLabels) // 总体统计结果 println(s"Recall = ${metrics.recall}") println(s"Precision = ${metrics.precision}") println(s"F1 measure = ${metrics.f1Measure}") println(s"Accuracy = ${metrics.accuracy}") // 基于每一标签的统计结果 metrics.labels.foreach(label => println(s"Class $label precision = ${metrics.precision(label)}")) metrics.labels.foreach(label => println(s"Class $label recall = ${metrics.recall(label)}")) metrics.labels.foreach(label => println(s"Class $label F1-score = ${metrics.f1Measure(label)}")) // Micro stats println(s"Micro recall = ${metrics.microRecall}") println(s"Micro precision = ${metrics.microPrecision}") println(s"Micro F1 measure = ${metrics.microF1Measure}") // Hamming loss println(s"Hamming loss = ${metrics.hammingLoss}") // Subset accuracy println(s"Subset accuracy = ${metrics.subsetAccuracy}")
参考 MultilabelMetrics Scala docs 以获取更多 API 相关细节
在 Spark 项目的“examples/src/main/scala/org/apache/spark/examples/mllib/MultiLabelMetricsExample.scala”路径下可以找到完整示例代码。
排名算法(通常被作为推荐系统算法)的作用是基于一些训练数据给用户返回一个相关物品或文章的集合。相关性的定义会根据不同的应用场景而有所差异。排名系统评估指标则用于在不同的场景中量化排名或者推荐的效果。某些指标会把系统推荐的文章集合跟实际相关的文章集合做对比,而另外一些指标会显式地结合数值打分。
可用的指标
一个排名系统通常会处理一个由
个用户组成的集合
每一个用户
有一个由 个真正相关的文章组成的集合
并且还有一个由
个推荐文章组成的有序集合,按照相关度降序排序
排序系统的目标就是给每个用户提供与其相关度最高的若干个文章集合。集合的相关度以及算法的有效性可以用下列评估指标进行度量。
度量 | 定义 | 说明 |
---|---|---|
Precision at k | Precision at k 是用来度量对所有用户来说,平均前k个推荐的文章里有几个在真正相关的文章集合中出现。这个指标不考虑推荐的顺序问题。 | |
Mean Average Precision | MAP 是用来度量推荐的文章集合里有几个出现在真正相关的文章集合中,并且考虑推荐的顺序性(比如文章的相关性越高,它出错的惩罚就越大)。 | |
Normalized Discounted Cumulative Gain | NDCG at k 是用来度量对所有用户来说,平均前k个推荐的文章里有几个在真正相关的文章集合中出现。跟 Precision at k 不同,这个指标考虑推荐的顺序性(假设文章按照相关性降序排序)。 |
举例
下面代码片段阐释了如何加载一个数据集样本,并使用数据训练一个交替最小二乘法推荐算法模型,最后通过几个排名评估指标来评价推荐系统效果。下面简单总结了一下使用的方法。
对于影片的打分在 1 - 5 分区间:
那么如果预测的打分小于3分,我们将不会推荐这部影片。我们使用下面映射关系来表示置信分数:
这个映射关系表示没有被用户察觉到的影片通常在“还可以”和“不怎么样”之间。权重0在这里的语义表示“跟这个影片从来没有过任何互动”。
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics} import org.apache.spark.mllib.recommendation.{ALS, Rating} // 读取打分数据 val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line => val fields = line.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) }.cache() // 将打分映射到0或者1,1表示影片应该被推荐 val binarizedRatings = ratings.map(r => Rating(r.user, r.product, if (r.rating > 0) 1.0 else 0.0)).cache() // 评分的统计结果 val numRatings = ratings.count() val numUsers = ratings.map(_.user).distinct().count() val numMovies = ratings.map(_.product).distinct().count() println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.") // 建模 val numIterations = 10 val rank = 10 val lambda = 0.01 val model = ALS.train(ratings, rank, numIterations, lambda) // 定义一个函数将打分映射到0和1区间 def scaledRating(r: Rating): Rating = { val scaledRating = math.max(math.min(r.rating, 1.0), 0.0) Rating(r.user, r.product, scaledRating) } // 获取排序后对于每个用户前十个预测值,并映射到0和1区间 val userRecommended = model.recommendProductsForUsers(10).map { case (user, recs) => (user, recs.map(scaledRating)) } // 假设任何被用户打分为3分或更高(映射到1)的影片为一个相关的影片 // 并与前十个最相关的影片做对比 val userMovies = binarizedRatings.groupBy(_.user) val relevantDocuments = userMovies.join(userRecommended).map { case (user, (actual, predictions)) => (predictions.map(_.product), actual.filter(_.rating > 0.0).map(_.product).toArray) } // 实例化一个评估指标模型 val metrics = new RankingMetrics(relevantDocuments) // Precision at K Array(1, 3, 5).foreach { k => println(s"Precision at $k = ${metrics.precisionAt(k)}") } // Mean average precision println(s"Mean average precision = ${metrics.meanAveragePrecision}") // Normalized discounted cumulative gain Array(1, 3, 5).foreach { k => println(s"NDCG at $k = ${metrics.ndcgAt(k)}") } // 获得每个数据点的 Prediction val allPredictions = model.predict(ratings.map(r => (r.user, r.product))).map(r => ((r.user, r.product), r.rating)) val allRatings = ratings.map(r => ((r.user, r.product), r.rating)) val predictionsAndLabels = allPredictions.join(allRatings).map { case ((user, product), (predicted, actual)) => (predicted, actual) } // 使用回归评估指标获得 RMSE val regressionMetrics = new RegressionMetrics(predictionsAndLabels) println(s"RMSE = ${regressionMetrics.rootMeanSquaredError}") // R-squared println(s"R-squared = ${regressionMetrics.r2}")
参考 RegressionMetrics Scala docs 和 RankingMetrics Scala docs 获取更多 API 相关细节
在 Spark 项目的“examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala”路径下可以找到完整示例代码。
Regression analysis(回归分析)被用于从一系列自变量中预测连续的因变量的场景中。
可用的指标
指标 | 定义 |
---|---|
Mean Squared Error (MSE) | |
Root Mean Squared Error (RMSE) | |
Mean Absolute Error (MAE) | |
Coefficient of Determination | |
Explained Variance |
举例
下面代码片段阐释了如何加载一个数据集样本,并使用数据训练一个线性回归算法模型,最后通过几个回归评估指标来对模型效果进行评价的过程。
import org.apache.spark.mllib.evaluation.RegressionMetrics import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD} // 加载数据 val data = spark .read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt") .rdd.map(row => LabeledPoint(row.getDouble(0), row.get(1).asInstanceOf[Vector])) .cache() // 建模 val numIterations = 100 val model = LinearRegressionWithSGD.train(data, numIterations) // 获取预测值 val valuesAndPreds = data.map{ point => val prediction = model.predict(point.features) (prediction, point.label) } // 实例化一个回归指标对象 val metrics = new RegressionMetrics(valuesAndPreds) // Squared error println(s"MSE = ${metrics.meanSquaredError}") println(s"RMSE = ${metrics.rootMeanSquaredError}") // R-squared println(s"R-squared = ${metrics.r2}") // Mean absolute error println(s"MAE = ${metrics.meanAbsoluteError}") // Explained variance println(s"Explained variance = ${metrics.explainedVariance}")
请参考 RegressionMetrics Scala docs 来获取更多 API 相关细节
在 Spark 项目的“examples/src/main/scala/org/apache/spark/examples/mllib/RegressionMetricsExample.scala”路径下可以找到完整示例代码。