提交 62c3d509 编写于 作者: F fobeligi 提交者: Till Rohrmann

[FLINK-1809] Add Preprocessing package and Standardizer to ML-library

[FLINK-1809] Add documentation for Standard Scaler

[FLINK-1809] [ml] Integrates standard scaler documentation into the web site

This closes #579.
上级 6a1edc8e
...@@ -59,6 +59,7 @@ under the License. ...@@ -59,6 +59,7 @@ under the License.
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/ml/multiple_linear_regression.html">Multiple linear regression</a> <small>Beta</small></div></li> <li><div class="sidenav-item"><a href="{{ site.baseurl }}/ml/multiple_linear_regression.html">Multiple linear regression</a> <small>Beta</small></div></li>
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/ml/polynomial_base_feature_mapper.html">Polynomial Base Feature Mapper</a> <small>Beta</small></div></li> <li><div class="sidenav-item"><a href="{{ site.baseurl }}/ml/polynomial_base_feature_mapper.html">Polynomial Base Feature Mapper</a> <small>Beta</small></div></li>
<li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/ml/cocoa.html">CoCoA</a> <small>Beta</small></div></li> <li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/ml/cocoa.html">CoCoA</a> <small>Beta</small></div></li>
<li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/ml/standard_scaler.html">Standard Scaler</a> <small>Beta</small></div></li>
<li><div class="sidenav-category">Advanced</div></li> <li><div class="sidenav-category">Advanced</div></li>
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/internal_general_arch.html">Architecture and Process Model</a></div></li> <li><div class="sidenav-item"><a href="{{ site.baseurl }}/internal_general_arch.html">Architecture and Process Model</a></div></li>
......
---
mathjax: include
title: "Standard Scaler"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
* This will be replaced by the TOC
{:toc}
## Description
The standard scaler scales the given data set, so that all features will have a user specified mean and variance.
In case the user does not provide a specific mean and standard deviation, the standard scaler transforms the features of the input data set to have mean equal to 0 and standard deviation equal to 1.
Given a set of input data $x_{1}, x_{2},... x_{n}$, with mean:
$$\bar{x} = \frac{1}{n}\sum_{i=1}^{n}x_{i}$$
and standard deviation:
$$\sigma_{x}=\sqrt{ \frac{1}{n} \sum_{i=1}^{n}(x_{i}-\bar{x})^{2}}$$
The scaled data set $z_{1}, z_{2},...,z_{n}$ will be:
$$z_{i}= std \left (\frac{x_{i} - \bar{x} }{\sigma_{x}}\right ) + mean$$
where $\textit{std}$ and $\textit{mean}$ are the user specified values for the standard deviation and mean.
## Parameters
The standard scaler implementation can be controlled by the following two parameters:
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Parameters</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Mean</strong></td>
<td>
<p>
The mean of the scaled data set. (Default value: <strong>0.0</strong>)
</p>
</td>
</tr>
<tr>
<td><strong>Std</strong></td>
<td>
<p>
The standard deviation of the scaled data set. (Default value: <strong>1.0</strong>)
</p>
</td>
</tr>
</tbody>
</table>
## Examples
{% highlight scala %}
// Create standard scaler transformer
val scaler = StandardScaler()
.setMean(10.0)
.setStd(2.0)
// Obtain data set to be scaled
val dataSet: DataSet[Vector] = ...
// Scale the provided data set to have mean=10.0 and std=2.0
val scaledDS = scaler.transform(dataSet)
{% endhighlight %}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.ml.preprocessing
import breeze.linalg
import breeze.numerics.sqrt
import breeze.numerics.sqrt._
import org.apache.flink.api.common.functions._
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
import org.apache.flink.ml.math.Breeze._
import org.apache.flink.ml.math.Vector
import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
/** Scales observations, so that all features have a user-specified mean and standard deviation.
* By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
*
* This transformer takes a [[Vector]] of values and maps it to a
* scaled [[Vector]] such that each feature has a user-specified mean and standard deviation.
*
* This transformer can be prepended to all [[Transformer]] and
* [[org.apache.flink.ml.common.Learner]] implementations which expect an input of
* [[Vector]].
*
* @example
* {{{
* val trainingDS: DataSet[Vector] = env.fromCollection(data)
* val transformer = StandardScaler().setMean(10.0).setStd(2.0)
*
* transformer.transform(trainingDS)
* }}}
*
* =Parameters=
*
* - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0
* - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default
* equal to 1
*/
class StandardScaler extends Transformer[Vector, Vector] with Serializable {
/** Sets the target mean of the transformed data
*
* @param mu the user-specified mean value.
* @return the StandardScaler instance with its mean value set to the user-specified value
*/
def setMean(mu: Double): StandardScaler = {
parameters.add(Mean, mu)
this
}
/** Sets the target standard deviation of the transformed data
*
* @param std the user-specified std value. In case the user gives 0.0 value as input,
* the std is set to the default value: 1.0.
* @return the StandardScaler instance with its std value set to the user-specified value
*/
def setStd(std: Double): StandardScaler = {
if (std == 0.0) {
return this
}
parameters.add(Std, std)
this
}
override def transform(input: DataSet[Vector], parameters: ParameterMap):
DataSet[Vector] = {
val resultingParameters = this.parameters ++ parameters
val mean = resultingParameters(Mean)
val std = resultingParameters(Std)
val featureMetrics = extractFeatureMetrics(input)
input.map(new RichMapFunction[Vector, Vector]() {
var broadcastMean: linalg.Vector[Double] = null
var broadcastStd: linalg.Vector[Double] = null
override def open(parameters: Configuration): Unit = {
val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double],
linalg.Vector[Double])]("broadcastedMetrics").get(0)
broadcastMean = broadcastedMetrics._1
broadcastStd = broadcastedMetrics._2
}
override def map(vector: Vector): Vector = {
var myVector = vector.asBreeze
myVector -= broadcastMean
myVector :/= broadcastStd
myVector = (myVector :* std) + mean
return myVector.fromBreeze
}
}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
}
/** Calculates in one pass over the data the features' mean and standard deviation.
* For the calculation of the Standard deviation with one pass over the data,
* the Youngs & Cramer algorithm was used:
* [[http://www.cs.yale.edu/publications/techreports/tr222.pdf]]
*
*
* @param dataSet The data set for which we want to calculate mean and variance
* @return DataSet containing a single tuple of two vectors (meanVector, stdVector).
* The first vector represents the mean vector and the second is the standard
* deviation vector.
*/
private def extractFeatureMetrics(dataSet: DataSet[Vector])
: DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
val metrics = dataSet.map{
v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
}.reduce{
(metrics1, metrics2) => {
/* We use formula 1.5b of the cited technical report for the combination of partial
* sum of squares. According to 1.5b:
* val temp1 : m/n(m+n)
* val temp2 : n/m
*/
val temp1 = metrics1._1 / (metrics2._1 * (metrics1._1 + metrics2._1))
val temp2 = metrics2._1 / metrics1._1
val tempVector = (metrics1._2 * temp2) - metrics2._2
val tempS = (metrics1._3 + metrics2._3) + (tempVector :* tempVector) * temp1
(metrics1._1 + metrics2._1, metrics1._2 + metrics2._2, tempS)
}
}.map{
metric => {
val varianceVector = sqrt(metric._3 / metric._1)
for (i <- 0 until varianceVector.size) {
if (varianceVector(i) == 0.0) {
varianceVector.update(i, 1.0)
}
}
(metric._2 / metric._1, varianceVector)
}
}
metrics
}
}
object StandardScaler {
case object Mean extends Parameter[Double] {
override val defaultValue: Option[Double] = Some(0.0)
}
case object Std extends Parameter[Double] {
override val defaultValue: Option[Double] = Some(1.0)
}
def apply(): StandardScaler = {
new StandardScaler()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.ml.preprocessing
import breeze.linalg
import breeze.numerics.sqrt
import breeze.numerics.sqrt._
import org.apache.flink.api.scala._
import org.apache.flink.ml.math.{Vector, DenseVector}
import org.apache.flink.test.util.FlinkTestBase
import org.apache.flink.ml.math.Breeze._
import org.scalatest._
class StandardScalerITSuite
extends FlatSpec
with Matchers
with FlinkTestBase {
behavior of "Flink's Standard Scaler"
import StandardScalerData._
it should "scale the vectors to have mean equal to 0 and std equal to 1" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromCollection(data)
val transformer = StandardScaler()
val scaledVectors = transformer.transform(dataSet).collect
scaledVectors.length should equal(data.length)
val numberOfFeatures = scaledVectors(0).size
var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures)
var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures)
for (vector <- scaledVectors) {
scaledMean += vector.asBreeze
}
scaledMean /= scaledVectors.size.asInstanceOf[Double]
for (vector <- scaledVectors) {
val temp = vector.asBreeze - scaledMean
scaledStd += temp :* temp
}
scaledStd /= scaledVectors.size.asInstanceOf[Double]
scaledStd = sqrt(scaledStd)
for (i <- 0 until numberOfFeatures) {
scaledMean(i) should be(0.0 +- (0.0000000000001))
scaledStd(i) should be(1.0 +- (0.0000000000001))
}
}
it should "scale the vectors to have mean equal to 10 and standard deviation equal to 2" in {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromCollection(data)
val transformer = StandardScaler().setMean(10.0).setStd(2.0)
val scaledVectors = transformer.transform(dataSet).collect
scaledVectors.length should equal(data.length)
val numberOfFeatures = scaledVectors(0).size
var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures)
var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures)
for (vector <- scaledVectors) {
scaledMean += vector.asBreeze
}
scaledMean /= scaledVectors.size.asInstanceOf[Double]
for (vector <- scaledVectors) {
val temp = vector.asBreeze - scaledMean
scaledStd += temp :* temp
}
scaledStd /= scaledVectors.size.asInstanceOf[Double]
scaledStd = sqrt(scaledStd)
for (i <- 0 until numberOfFeatures) {
scaledMean(i) should be(10.0 +- (0.0000000000001))
scaledStd(i) should be(2.0 +- (0.0000000000001))
}
}
}
object StandardScalerData {
val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)),
DenseVector(Array(1600.00, 3.00)),
DenseVector(Array(2400.00, 3.00)),
DenseVector(Array(1416.00, 2.00)),
DenseVector(Array(3000.00, 4.00)),
DenseVector(Array(1985.00, 4.00)),
DenseVector(Array(1534.00, 3.00)),
DenseVector(Array(1427.00, 3.00)),
DenseVector(Array(1380.00, 3.00)),
DenseVector(Array(1494.00, 3.00)),
DenseVector(Array(1940.00, 4.00)),
DenseVector(Array(2000.00, 3.00)),
DenseVector(Array(1890.00, 3.00)),
DenseVector(Array(4478.00, 5.00)),
DenseVector(Array(1268.00, 3.00)),
DenseVector(Array(2300.00, 4.00)),
DenseVector(Array(1320.00, 2.00)),
DenseVector(Array(1236.00, 3.00)),
DenseVector(Array(2609.00, 4.00)),
DenseVector(Array(3031.00, 4.00)),
DenseVector(Array(1767.00, 3.00)),
DenseVector(Array(1888.00, 2.00)),
DenseVector(Array(1604.00, 3.00)),
DenseVector(Array(1962.00, 4.00)),
DenseVector(Array(3890.00, 3.00)),
DenseVector(Array(1100.00, 3.00)),
DenseVector(Array(1458.00, 3.00)),
DenseVector(Array(2526.00, 3.00)),
DenseVector(Array(2200.00, 3.00)),
DenseVector(Array(2637.00, 3.00)),
DenseVector(Array(1839.00, 2.00)),
DenseVector(Array(1000.00, 1.00)),
DenseVector(Array(2040.00, 4.00)),
DenseVector(Array(3137.00, 3.00)),
DenseVector(Array(1811.00, 4.00)),
DenseVector(Array(1437.00, 3.00)),
DenseVector(Array(1239.00, 3.00)),
DenseVector(Array(2132.00, 4.00)),
DenseVector(Array(4215.00, 4.00)),
DenseVector(Array(2162.00, 4.00)),
DenseVector(Array(1664.00, 2.00)),
DenseVector(Array(2238.00, 3.00)),
DenseVector(Array(2567.00, 4.00)),
DenseVector(Array(1200.00, 3.00)),
DenseVector(Array(852.00, 2.00)),
DenseVector(Array(1852.00, 4.00)),
DenseVector(Array(1203.00, 3.00))
)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册