From 63504a3cb331b9fb29cf833eeaa66afe5ba5d5f1 Mon Sep 17 00:00:00 2001 From: chobeat Date: Tue, 17 May 2016 14:19:15 +0200 Subject: [PATCH] [FLINK-3919] [ml] Implement DistributedRowMatrix This closes #1996. --- .../math/distributed/DistributedMatrix.scala | 39 +++++ .../distributed/DistributedRowMatrix.scala | 161 ++++++++++++++++++ .../DistributedRowMatrixSuite.scala | 104 +++++++++++ 3 files changed, 304 insertions(+) create mode 100644 flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala create mode 100644 flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala create mode 100644 flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrixSuite.scala diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala new file mode 100644 index 00000000000..f34641e0a24 --- /dev/null +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +/** Common trait used by distributed data structures representing a matrix. */ +trait DistributedMatrix { + /** Returns number of rows in matrix. + * + * @return Number of rows + */ + def numRows: Int + + /** Returns number of columns in matrix. + * + * @return Number of columns + */ + def numCols: Int +} + +object DistributedMatrix { + type MatrixColIndex = Int + type MatrixRowIndex = Int +} diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala new file mode 100644 index 00000000000..9092e5c3038 --- /dev/null +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.distributed.DistributedMatrix._ +import org.apache.flink.ml.math._ + +/** Represents distributed row-major matrix. + * + * @param data [[DataSet]] which contains [[IndexedRow]]s + * @param numRows Number of rows + * @param numCols Number of columns + */ +class DistributedRowMatrix( + val data: DataSet[IndexedRow], + val numRows: Int, + val numCols: Int +) extends DistributedMatrix { + + /** Collects the data in the form of a sequence of coordinates associated with their values. + * This operation immediately triggers program execution. + */ + def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = { + val localRows = data.collect() + + for { + IndexedRow(rowIndex, vector) <- localRows + (columnIndex, value) <- vector + } yield (rowIndex, columnIndex, value) + } + + /** Collects the data in the form of a SparseMatrix. This operation immediately triggers program + * execution. + */ + def toLocalSparseMatrix: SparseMatrix = { + val localMatrix = SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO) + require(localMatrix.numRows == this.numRows) + require(localMatrix.numCols == this.numCols) + + localMatrix + } + + // TODO: convert to dense representation on the distributed matrix and collect it afterward + /** Collects the data in the form of a DenseMatrix. This operation immediately triggers program + * execution. + */ + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** Applies a high-order function to couple of rows. + * + * @param func a function to be applied + * @param other a [[DistributedRowMatrix]] to apply the function together + */ + def byRowOperation( + func: (Vector, Vector) => Vector, + other: DistributedRowMatrix + ): DistributedRowMatrix = { + val otherData = other.data + require(this.numCols == other.numCols) + require(this.numRows == other.numRows) + + val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { + val row1 = Option(left) match { + case Some(row: IndexedRow) => row + case None => + IndexedRow(right.rowIndex, SparseVector.fromCOO(right.values.size, List((0, 0.0)))) + } + val row2 = Option(right) match { + case Some(row: IndexedRow) => row + case None => + IndexedRow(left.rowIndex, SparseVector.fromCOO(left.values.size, List((0, 0.0)))) + } + IndexedRow(row1.rowIndex, func(row1.values, row2.values)) + } + ) + new DistributedRowMatrix(result, numRows, numCols) + } + + /** Adds this matrix to another matrix. + * + * @param other a [[DistributedRowMatrix]] to be added + */ + def add(other: DistributedRowMatrix): DistributedRowMatrix = { + val addFunction = (x: Vector, y: Vector) => (x.asBreeze + y.asBreeze).fromBreeze + this.byRowOperation(addFunction, other) + } + + /** Subtracts another matrix from this matrix. + * + * @param other a [[DistributedRowMatrix]] to be subtracted from this matrix + */ + def subtract(other: DistributedRowMatrix): DistributedRowMatrix = { + val subFunction = (x: Vector, y: Vector) => (x.asBreeze - y.asBreeze).fromBreeze + this.byRowOperation(subFunction, other) + } +} + +object DistributedRowMatrix { + + /** Builds a [[DistributedRowMatrix]] from a [[DataSet]] in COO. + * + * @param data [[DataSet]] which contains matrix elements in the form of + * (row index, column index, value) + * @param numRows Number of rows + * @param numCols Number of columns + * @param isSorted If false, sorts the row to properly build the matrix representation. + * If already sorted, set this parameter to true to skip sorting. + */ + def fromCOO(data: DataSet[(MatrixRowIndex, MatrixColIndex, Double)], + numRows: Int, + numCols: Int, + isSorted: Boolean = false + ): DistributedRowMatrix = { + val vectorData: DataSet[(MatrixRowIndex, SparseVector)] = data + .groupBy(0) + .reduceGroup(sparseRow => { + require(sparseRow.nonEmpty) + val sortedRow = + if (isSorted) { + sparseRow.toList + } else { + sparseRow.toList.sortBy(row => row._2) + } + val (indices, values) = sortedRow.map(x => (x._2, x._3)).unzip + (sortedRow.head._1, SparseVector(numCols, indices.toArray, values.toArray)) + }) + + val zippedData = vectorData.map(x => IndexedRow(x._1.toInt, x._2)) + + new DistributedRowMatrix(zippedData, numRows, numCols) + } +} + +/** Represents a row in row-major matrix. */ +case class IndexedRow(rowIndex: MatrixRowIndex, values: Vector) extends Ordered[IndexedRow] { + def compare(other: IndexedRow) = this.rowIndex.compare(other.rowIndex) + + override def toString: String = s"($rowIndex, ${values.toString})" +} diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrixSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrixSuite.scala new file mode 100644 index 00000000000..598b0f37497 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrixSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class DistributedRowMatrixSuite extends FlatSpec with Matchers with FlinkTestBase { + behavior of "DistributedRowMatrix" + + val rawSampleData = List( + (0, 0, 3.0), + (0, 1, 3.0), + (0, 3, 4.0), + (2, 3, 4.0), + (1, 4, 3.0), + (1, 1, 3.0), + (2, 1, 3.0), + (2, 2, 3.0) + ) + + it should "contain the initialization data" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val rowDataset = env.fromCollection(rawSampleData) + val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5) + + dmatrix.toCOO.toSet.filter(_._3 != 0) shouldBe rawSampleData.toSet + } + + it should "return the correct dimensions when provided by the user" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val rowDataset = env.fromCollection(rawSampleData) + val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5) + + dmatrix.numCols shouldBe 5 + dmatrix.numRows shouldBe 3 + } + + + it should "return a sparse local matrix containing the initialization data" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val rowDataset = env.fromCollection(rawSampleData) + val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5) + + dmatrix.toLocalSparseMatrix.iterator.filter(_._3 != 0).toSet shouldBe rawSampleData.toSet + } + + it should "return a dense local matrix containing the initialization data" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val rowDataset = env.fromCollection(rawSampleData) + val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5) + + dmatrix.toLocalDenseMatrix.iterator.filter(_._3 != 0).toSet shouldBe rawSampleData.toSet + } + + "add" should "correctly add two distributed row matrices" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val rawSampleSum1 = List( + (0, 0, 1.0), + (7, 4, 3.0), + (0, 1, 8.0), + (2, 8, 12.0) + ) + + val rawSampleSum2 = List( + (0, 0, 2.0), + (3, 4, 4.0), + (2, 8, 8.0) + ) + + val addBlockMatrix1 = DistributedRowMatrix.fromCOO(env.fromCollection(rawSampleSum1), 10, 10) + val addBlockMatrix2 = DistributedRowMatrix.fromCOO(env.fromCollection(rawSampleSum2), 10, 10) + + val expected = List( + (0, 0, 3.0), + (0, 1, 8.0), + (3, 4, 4.0), + (2, 8, 20.0), + (7, 4, 3.0) + ) + val result = addBlockMatrix1 + .add(addBlockMatrix2) + .toLocalSparseMatrix + .filter(_._3 != 0.0) + result.toSet shouldEqual expected.toSet + } +} -- GitLab