提交 fe0c3b53 编写于 作者: T Till Rohrmann

[FLINK-3330] [ml] Fix SparseVector support in GradientDescent

The GradientDescent implementation did not work with sparse input data
because it requires the gradient to be dense. This patch makes sure that
the gradient sum is always dense.

This closes #1587.
上级 8e3e2f8f
......@@ -46,8 +46,6 @@ import org.apache.flink.ml._
* function between successive iterations is is smaller than this value.
* [[IterativeSolver.LearningRateMethodValue]] determines functional form of
* effective learning rate.
* [[IterativeSolver.Decay]] Used in some functional forms for determining
* effective learning rate.
*/
abstract class GradientDescent extends IterativeSolver {
......@@ -192,10 +190,19 @@ abstract class GradientDescent extends IterativeSolver {
(left, right) =>
val (leftGradVector, leftCount) = left
val (rightGradVector, rightCount) = right
// Add the left gradient to the right one
BLAS.axpy(1.0, leftGradVector.weights, rightGradVector.weights)
// make the left gradient dense so that the following reduce operations (left fold) reuse
// it. This strongly depends on the underlying implementation of the ReduceDriver which
// always passes the new input element as the second parameter
val result = leftGradVector.weights match {
case d: DenseVector => d
case s: SparseVector => s.toDenseVector
}
// Add the right gradient to the result
BLAS.axpy(1.0, rightGradVector.weights, result)
val gradients = WeightVector(
rightGradVector.weights, leftGradVector.intercept + rightGradVector.intercept)
result, leftGradVector.intercept + rightGradVector.intercept)
(gradients , leftCount + rightCount)
}.mapWithBcVariableIteration(currentWeights){
......
......@@ -66,6 +66,32 @@ class MultipleLinearRegressionITSuite
srs should be (expectedSquaredResidualSum +- 2)
}
it should "work with sparse vectors as input" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val mlr = MultipleLinearRegression()
val sparseInputDS = env.fromCollection(RegressionData.sparseData)
val parameters = ParameterMap()
parameters.add(MultipleLinearRegression.Stepsize, 2.0)
parameters.add(MultipleLinearRegression.Iterations, 10)
parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
mlr.fit(sparseInputDS, parameters)
val weightList = mlr.weightsOption.get.collect()
val WeightVector(weights, intercept) = weightList.head
RegressionData.expectedWeightsSparseInput.toIterator zip weights.valueIterator foreach {
case (expectedWeight, weight) =>
weight should be (expectedWeight +- 1)
}
intercept should be (RegressionData.expectedInterceptSparseInput +- 0.4)
}
it should "estimate a cubic function" in {
val env = ExecutionEnvironment.getExecutionEnvironment
......
......@@ -19,7 +19,7 @@
package org.apache.flink.ml.regression
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
import org.apache.flink.ml.math.{SparseVector, DenseVector}
object RegressionData {
......@@ -27,6 +27,21 @@ object RegressionData {
val expectedWeight0: Double = 9.8158
val expectedSquaredResidualSum: Double = 49.7596/2
val sparseData: Seq[LabeledVector] = Seq(
new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))),
new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))),
new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
val expectedWeightsSparseInput = Array(0.5448906338353784, 0.15718880164669916,
0.034001300318125725, 0.38770183218867915, 0.0,
0.15718880164669916, 0.0, 0.0, 0.0, 0.15718880164669916)
val expectedInterceptSparseInput = -0.006918274867886108
val data: Seq[LabeledVector] = Seq(
LabeledVector(10.7949, DenseVector(0.2714)),
LabeledVector(10.6426, DenseVector(0.1008)),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册