diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index e3bc416bcacd540005669a279d579b508c26af45..407c074b7f12c6ef17c165db1eb48dbc40530aab 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -46,8 +46,6 @@ import org.apache.flink.ml._ * function between successive iterations is is smaller than this value. * [[IterativeSolver.LearningRateMethodValue]] determines functional form of * effective learning rate. - * [[IterativeSolver.Decay]] Used in some functional forms for determining - * effective learning rate. */ abstract class GradientDescent extends IterativeSolver { @@ -192,10 +190,19 @@ abstract class GradientDescent extends IterativeSolver { (left, right) => val (leftGradVector, leftCount) = left val (rightGradVector, rightCount) = right - // Add the left gradient to the right one - BLAS.axpy(1.0, leftGradVector.weights, rightGradVector.weights) + + // make the left gradient dense so that the following reduce operations (left fold) reuse + // it. This strongly depends on the underlying implementation of the ReduceDriver which + // always passes the new input element as the second parameter + val result = leftGradVector.weights match { + case d: DenseVector => d + case s: SparseVector => s.toDenseVector + } + + // Add the right gradient to the result + BLAS.axpy(1.0, rightGradVector.weights, result) val gradients = WeightVector( - rightGradVector.weights, leftGradVector.intercept + rightGradVector.intercept) + result, leftGradVector.intercept + rightGradVector.intercept) (gradients , leftCount + rightCount) }.mapWithBcVariableIteration(currentWeights){ diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index 17b8a85a017406754ee2013e6de6001b8b113480..51214e643c59bc57bbff5a527af13a39009f59da 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -66,6 +66,32 @@ class MultipleLinearRegressionITSuite srs should be (expectedSquaredResidualSum +- 2) } + it should "work with sparse vectors as input" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val mlr = MultipleLinearRegression() + + val sparseInputDS = env.fromCollection(RegressionData.sparseData) + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 2.0) + parameters.add(MultipleLinearRegression.Iterations, 10) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + + mlr.fit(sparseInputDS, parameters) + + val weightList = mlr.weightsOption.get.collect() + + val WeightVector(weights, intercept) = weightList.head + + RegressionData.expectedWeightsSparseInput.toIterator zip weights.valueIterator foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 1) + } + intercept should be (RegressionData.expectedInterceptSparseInput +- 0.4) + } + it should "estimate a cubic function" in { val env = ExecutionEnvironment.getExecutionEnvironment diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala index 062f510be0ff68fb9babed9d36c2a234a9277f2e..0654ac248dfe4eb6e0e11e164636dafdf9ec2277 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala @@ -19,7 +19,7 @@ package org.apache.flink.ml.regression import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.math.{SparseVector, DenseVector} object RegressionData { @@ -27,6 +27,21 @@ object RegressionData { val expectedWeight0: Double = 9.8158 val expectedSquaredResidualSum: Double = 49.7596/2 + val sparseData: Seq[LabeledVector] = Seq( + new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))), + new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))), + new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))), + new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))), + new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))), + new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0)))) + + val expectedWeightsSparseInput = Array(0.5448906338353784, 0.15718880164669916, + 0.034001300318125725, 0.38770183218867915, 0.0, + 0.15718880164669916, 0.0, 0.0, 0.0, 0.15718880164669916) + + val expectedInterceptSparseInput = -0.006918274867886108 + + val data: Seq[LabeledVector] = Seq( LabeledVector(10.7949, DenseVector(0.2714)), LabeledVector(10.6426, DenseVector(0.1008)),