提交 63504a3c 编写于 作者: C chobeat 提交者: Chiwan Park

[FLINK-3919] [ml] Implement DistributedRowMatrix

This closes #1996.
上级 08b075aa
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.ml.math.distributed
/** Common trait used by distributed data structures representing a matrix. */
trait DistributedMatrix {
/** Returns number of rows in matrix.
*
* @return Number of rows
*/
def numRows: Int
/** Returns number of columns in matrix.
*
* @return Number of columns
*/
def numCols: Int
}
object DistributedMatrix {
type MatrixColIndex = Int
type MatrixRowIndex = Int
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.ml.math.distributed
import org.apache.flink.api.scala._
import org.apache.flink.ml.math.Breeze._
import org.apache.flink.ml.math.distributed.DistributedMatrix._
import org.apache.flink.ml.math._
/** Represents distributed row-major matrix.
*
* @param data [[DataSet]] which contains [[IndexedRow]]s
* @param numRows Number of rows
* @param numCols Number of columns
*/
class DistributedRowMatrix(
val data: DataSet[IndexedRow],
val numRows: Int,
val numCols: Int
) extends DistributedMatrix {
/** Collects the data in the form of a sequence of coordinates associated with their values.
* This operation immediately triggers program execution.
*/
def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = {
val localRows = data.collect()
for {
IndexedRow(rowIndex, vector) <- localRows
(columnIndex, value) <- vector
} yield (rowIndex, columnIndex, value)
}
/** Collects the data in the form of a SparseMatrix. This operation immediately triggers program
* execution.
*/
def toLocalSparseMatrix: SparseMatrix = {
val localMatrix = SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO)
require(localMatrix.numRows == this.numRows)
require(localMatrix.numCols == this.numCols)
localMatrix
}
// TODO: convert to dense representation on the distributed matrix and collect it afterward
/** Collects the data in the form of a DenseMatrix. This operation immediately triggers program
* execution.
*/
def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix
/** Applies a high-order function to couple of rows.
*
* @param func a function to be applied
* @param other a [[DistributedRowMatrix]] to apply the function together
*/
def byRowOperation(
func: (Vector, Vector) => Vector,
other: DistributedRowMatrix
): DistributedRowMatrix = {
val otherData = other.data
require(this.numCols == other.numCols)
require(this.numRows == other.numRows)
val result = this.data
.fullOuterJoin(otherData)
.where("rowIndex")
.equalTo("rowIndex")(
(left: IndexedRow, right: IndexedRow) => {
val row1 = Option(left) match {
case Some(row: IndexedRow) => row
case None =>
IndexedRow(right.rowIndex, SparseVector.fromCOO(right.values.size, List((0, 0.0))))
}
val row2 = Option(right) match {
case Some(row: IndexedRow) => row
case None =>
IndexedRow(left.rowIndex, SparseVector.fromCOO(left.values.size, List((0, 0.0))))
}
IndexedRow(row1.rowIndex, func(row1.values, row2.values))
}
)
new DistributedRowMatrix(result, numRows, numCols)
}
/** Adds this matrix to another matrix.
*
* @param other a [[DistributedRowMatrix]] to be added
*/
def add(other: DistributedRowMatrix): DistributedRowMatrix = {
val addFunction = (x: Vector, y: Vector) => (x.asBreeze + y.asBreeze).fromBreeze
this.byRowOperation(addFunction, other)
}
/** Subtracts another matrix from this matrix.
*
* @param other a [[DistributedRowMatrix]] to be subtracted from this matrix
*/
def subtract(other: DistributedRowMatrix): DistributedRowMatrix = {
val subFunction = (x: Vector, y: Vector) => (x.asBreeze - y.asBreeze).fromBreeze
this.byRowOperation(subFunction, other)
}
}
object DistributedRowMatrix {
/** Builds a [[DistributedRowMatrix]] from a [[DataSet]] in COO.
*
* @param data [[DataSet]] which contains matrix elements in the form of
* (row index, column index, value)
* @param numRows Number of rows
* @param numCols Number of columns
* @param isSorted If false, sorts the row to properly build the matrix representation.
* If already sorted, set this parameter to true to skip sorting.
*/
def fromCOO(data: DataSet[(MatrixRowIndex, MatrixColIndex, Double)],
numRows: Int,
numCols: Int,
isSorted: Boolean = false
): DistributedRowMatrix = {
val vectorData: DataSet[(MatrixRowIndex, SparseVector)] = data
.groupBy(0)
.reduceGroup(sparseRow => {
require(sparseRow.nonEmpty)
val sortedRow =
if (isSorted) {
sparseRow.toList
} else {
sparseRow.toList.sortBy(row => row._2)
}
val (indices, values) = sortedRow.map(x => (x._2, x._3)).unzip
(sortedRow.head._1, SparseVector(numCols, indices.toArray, values.toArray))
})
val zippedData = vectorData.map(x => IndexedRow(x._1.toInt, x._2))
new DistributedRowMatrix(zippedData, numRows, numCols)
}
}
/** Represents a row in row-major matrix. */
case class IndexedRow(rowIndex: MatrixRowIndex, values: Vector) extends Ordered[IndexedRow] {
def compare(other: IndexedRow) = this.rowIndex.compare(other.rowIndex)
override def toString: String = s"($rowIndex, ${values.toString})"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.ml.math.distributed
import org.apache.flink.api.scala._
import org.apache.flink.test.util.FlinkTestBase
import org.scalatest.{FlatSpec, Matchers}
class DistributedRowMatrixSuite extends FlatSpec with Matchers with FlinkTestBase {
behavior of "DistributedRowMatrix"
val rawSampleData = List(
(0, 0, 3.0),
(0, 1, 3.0),
(0, 3, 4.0),
(2, 3, 4.0),
(1, 4, 3.0),
(1, 1, 3.0),
(2, 1, 3.0),
(2, 2, 3.0)
)
it should "contain the initialization data" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val rowDataset = env.fromCollection(rawSampleData)
val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5)
dmatrix.toCOO.toSet.filter(_._3 != 0) shouldBe rawSampleData.toSet
}
it should "return the correct dimensions when provided by the user" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val rowDataset = env.fromCollection(rawSampleData)
val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5)
dmatrix.numCols shouldBe 5
dmatrix.numRows shouldBe 3
}
it should "return a sparse local matrix containing the initialization data" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val rowDataset = env.fromCollection(rawSampleData)
val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5)
dmatrix.toLocalSparseMatrix.iterator.filter(_._3 != 0).toSet shouldBe rawSampleData.toSet
}
it should "return a dense local matrix containing the initialization data" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val rowDataset = env.fromCollection(rawSampleData)
val dmatrix = DistributedRowMatrix.fromCOO(rowDataset, 3, 5)
dmatrix.toLocalDenseMatrix.iterator.filter(_._3 != 0).toSet shouldBe rawSampleData.toSet
}
"add" should "correctly add two distributed row matrices" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val rawSampleSum1 = List(
(0, 0, 1.0),
(7, 4, 3.0),
(0, 1, 8.0),
(2, 8, 12.0)
)
val rawSampleSum2 = List(
(0, 0, 2.0),
(3, 4, 4.0),
(2, 8, 8.0)
)
val addBlockMatrix1 = DistributedRowMatrix.fromCOO(env.fromCollection(rawSampleSum1), 10, 10)
val addBlockMatrix2 = DistributedRowMatrix.fromCOO(env.fromCollection(rawSampleSum2), 10, 10)
val expected = List(
(0, 0, 3.0),
(0, 1, 8.0),
(3, 4, 4.0),
(2, 8, 20.0),
(7, 4, 3.0)
)
val result = addBlockMatrix1
.add(addBlockMatrix2)
.toLocalSparseMatrix
.filter(_._3 != 0.0)
result.toSet shouldEqual expected.toSet
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册