我们提供对 RDD[Vector] 中的向量的统计,采用 Statistics 里的函数 colStats
colStats()
返回一个 MultivariateStatisticalSummary 的实例
, 包含列的最大值、最小值、均值、方差和非零的数量以及总数量。
阅读 MultivariateStatisticalSummary
Scala 文档查看 API 细节
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} val observations = sc.parallelize( Seq( Vectors.dense(1.0, 10.0, 100.0), Vectors.dense(2.0, 20.0, 200.0), Vectors.dense(3.0, 30.0, 300.0) ) ) // 计算概要统计. val summary: MultivariateStatisticalSummary = Statistics.colStats(observations) println(summary.mean) // 每一列的均值 println(summary.variance) // 所有向量的方差 println(summary.numNonzeros) // 每列中的非零数
更完整的代码请在 Spark 项目查看"examples/src/main/scala/org/apache/spark/examples/mllib/SummaryStatisticsExample.scala".
计算两个系列(series)数据之间的相关性的数据是在统计学一种常见的操作。在 spark.mllib 我们提供灵活的计算两两之间的相关性的方法。支持计算相关性的方法目前有 Pearson’s and Spearman’s (皮尔森和斯皮尔曼) 的相关性.
Statistics类
提供了计算系列之间相关性的方法。 根据输入类型,两个RDD [Double]或RDD [Vector],输出分别为Double或相关矩阵。
阅读 Statistics
Scala 文档查看 API 细节
import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD val seriesX: RDD[Double] = sc.parallelize(Array(1, 2, 3, 3, 5)) // a series // 必须具有相同数量的分区和基数作为一个series val seriesY: RDD[Double] = sc.parallelize(Array(11, 22, 33, 33, 555)) // 计算 Pearson's 相关性.输入"spearman" 调用 Spearman's 的计算方法. // 默认使用 Pearson's 方法. val correlation: Double = Statistics.corr(seriesX, seriesY, "pearson") println(s"Correlation is: $correlation") val data: RDD[Vector] = sc.parallelize( Seq( Vectors.dense(1.0, 10.0, 100.0), Vectors.dense(2.0, 20.0, 200.0), Vectors.dense(5.0, 33.0, 366.0)) ) // 注意每个向量为行而不是列 // 计算 Pearson's 相关性的矩阵.输入"spearman" 调用 Spearman's 的计算方法. // 默认使用 Pearson's 方法. val correlMatrix: Matrix = Statistics.corr(data, "pearson") println(correlMatrix.toString)
更完整的代码请在 Spark 项目查看"examples/src/main/scala/org/apache/spark/examples/mllib/CorrelationsExample.scala".
与spark.mllib中的其他统计功能不同,sampleByKey和sampleByKeyExact可以对键值对的RDD执行分层采样方法。 对于分层采样,键可以被认为是一个标签,该值作为一个特定属性。 例如,key 可以是男人或女人或文档ID,并且相应的 value 可以是人的年龄列表或文档中的单词列表。 sampleByKey方法将类似掷硬币方式来决定观察是否被采样,因此需要一次遍历数据,并提供期望的样本大小。 sampleByKeyExact需要比sampleByKey中使用的每层简单随机抽样花费更多的资源,但将提供99.99%置信度的确切抽样大小。 python当前不支持sampleByKeyExact。
sampleByKeyExact()
允许用户精确地采样⌈fk⋅nk⌉∀k∈K⌈fk⋅nk⌉∀k∈K项,其中fk fk是关键kk的期望分数,nk nk是关键kk的键值对的数量, 而KK是一组键。 无需更换的采样需要额外通过RDD以保证采样大小,而替换的采样需要两次额外的通过。
allows users to sample exactly ⌈f k ⋅n k ⌉ ∀k ∈K ⌈fk⋅nk⌉∀k∈K items, where f k fk is the desired fraction for key k k, n k nk is the number of key-value pairs for key k k, and K K is the set of keys. Sampling without replacement requires one additional pass over the RDD to guarantee sample size, whereas sampling with replacement requires two additional passes.
// RDD[(K, V)] 任何键值对 val data = sc.parallelize( Seq((1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f'))) // 指明每个Key的权重 val fractions = Map(1 -> 0.1, 2 -> 0.6, 3 -> 0.3) // 从每层获取一个近似的样本 val approxSample = data.sampleByKey(withReplacement = false, fractions = fractions) // 从每层获取一个确切的样本 val exactSample = data.sampleByKeyExact(withReplacement = false, fractions = fractions)
更完整的代码请在 Spark 项目查看"examples/src/main/scala/org/apache/spark/examples/mllib/StratifiedSamplingExample.scala".
假设检验是统计学中强大的工具,用于确定结果是否具有统计学意义,无论该结果是否偶然发生。 spark.mllib目前支持Pearson的卡方(χ2χ2)测试,以获得拟合优度和独立性。 输入数据类型确定是否进行拟合优度或独立性测试。 拟合优度测试需要输入类型的Vector,而独立性测试需要一个 Matrix
作为输入。
spark.mllib还支持输入类型
RDD[LabeledPoint]
通过卡方独立测试启用特征选择。
Statistics 提供运行Pearson卡方检验的方法。 以下示例演示如何运行和解释假设检验。
mport org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.Statistics import org.apache.spark.mllib.stat.test.ChiSqTestResult import org.apache.spark.rdd.RDD // 由事件频率组成的向量 val vec: Vector = Vectors.dense(0.1, 0.15, 0.2, 0.3, 0.25) // 计算适合度。 如果没有提供第二个测试向量 // 作为参数,针对均匀分布运行测试。 val goodnessOfFitTestResult = Statistics.chiSqTest(vec) // 测试包括p值,自由度,检验统计量,使用方法,和零假设 println(s"$goodnessOfFitTestResult\n") // a contingency matrix. Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val mat: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // conduct Pearson's independence test on the input contingency matrix // 对输入的矩阵进行Pearson独立性测试 val independenceTestResult = Statistics.chiSqTest(mat) // summary of the test including the p-value, degrees of freedom // 测试:包括p值,自由度 println(s"$independenceTestResult\n") val obs: RDD[LabeledPoint] = sc.parallelize( Seq( LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 0.0)), LabeledPoint(-1.0, Vectors.dense(-1.0, 0.0, -0.5) ) ) ) // (feature, label) pairs. // The contingency table is constructed from the raw (feature, label) pairs and used to conduct // the independence test. Returns an array containing the ChiSquaredTestResult for every feature // against the label. ///应急表由原始的(特征,标签)对构成,用于进行独立性测试。 返回一个包含针对标签的每个功能的ChiSquaredTestResult的数组。 val featureTestResults: Array[ChiSqTestResult] = Statistics.chiSqTest(obs) featureTestResults.zipWithIndex.foreach { case (k, v) => println("Column " + (v + 1).toString + ":") println(k) } // summary of the test
更完整的代码请在 Spark 项目查看"examples/src/main/scala/org/apache/spark/examples/mllib/HypothesisTestingExample.scala".
此外, spark.mllib
提供了对于概率分布相等的Kolmogorov-Smirnov(KS)测试的单样本双侧实现。 通过提供理论分布(目前仅为正态分布支持)及其参数的名称,或根据给定理论分布计算累积分布的函数,用户可以测试其假设,即样本服从该分布。 在用户根据正态分布(distName =“norm”)进行测试但不提供分发参数的情况下,测试将初始化为标准正态分布并记录适当的消息。
Statistics 提供了运行单样本,双侧Kolmogorov-Smirnov检验的方法。 以下示例演示如何运行和解释假设检验。
阅读 Statistics
Scala 文档查看 API 细节
import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD val data: RDD[Double] = sc.parallelize(Seq(0.1, 0.15, 0.2, 0.3, 0.25)) // an RDD of sample data // run a KS test for the sample versus a standard normal distribution val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1) // summary of the test including the p-value, test statistic, and null hypothesis if our p-value // indicates significance, we can reject the null hypothesis. println(testResult) println() // perform a KS test using a cumulative distribution function of our making val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1) val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF) println(testResult2)
spark.mllib
提供了一些测试的在线实现,以支持A / B测试用例等。 这些测试可以在Spark Streaming DStream [(Boolean,Double)]上执行,其中每个元组的第一个元素指示控制组(false)或处理组(true),第二个元素是观察值。
流式测试显示支持以下参数:
peacePeriod
- 从流中忽略的初始数据点数,用于减轻新奇效应.windowSize
- - 执行假设测试的过去批次数。 设置为0将使用所有之前的批次执行累积处理。StreamingTest
提供流式假设检验.
val data = ssc.textFileStream(dataDir).map(line => line.split(",") match { case Array(label, value) => BinarySample(label.toBoolean, value.toDouble) }) val streamingTest = new StreamingTest() .setPeacePeriod(0) .setWindowSize(0) .setTestMethod("welch") val out = streamingTest.registerStream(data) out.print()
更完整的代码请在 Spark 项目查看 "examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala".
随机数据生成对于随机算法,原型设计和性能测试很有用。 spark.mllib支持使用i.i.d.生成随机RDD。 从给定分布绘制的值:均匀,标准正态或泊松分布。
RandomRDDs
提供工厂方法来生成随机double型RDD或向量RDD。 以下示例生成随机double型RDD,其值遵循标准正态分布N(0,1),然后映射到N(1,4)。
RandomRDDs
Scala docs 文档查看 API 细节import org.apache.spark.SparkContext import org.apache.spark.mllib.random.RandomRDDs._ val sc: SparkContext = ... // Generate a random double RDD that contains 1 million i.i.d. values drawn from the // standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions. val u = normalRDD(sc, 1000000L, 10) // Apply a transform to get a random double RDD following `N(1, 4)`. val v = u.map(x => 1.0 + 2.0 * x)
核密度估计 是一种用于可视化经验概率分布的技术,而不需要对所观察到的样本的特定分布进行假设。 它计算在给定集合点评估的随机变量的概率密度函数的估计。 它通过在特定点表达PDF的经验分布来实现这一估计,这是以每个样本为中心的正态分布的PDF平均值.
KernelDensity
提供了从RDD样本计算核密度估计的方法。 以下示例演示如何执行此操作。
阅读 KernelDensity
文档查看 API 细节
import org.apache.spark.mllib.stat.KernelDensity import org.apache.spark.rdd.RDD // an RDD of sample data val data: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 9)) // Construct the density estimator with the sample data and a standard deviation // for the Gaussian kernels val kd = new KernelDensity() .setSample(data) .setBandwidth(3.0) // Find density estimates for the given values val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
更完整的代码请在 Spark 项目查看 "examples/src/main/scala/org/apache/spark/examples/mllib/KernelDensityEstimationExample.scala" .