$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$ # 解读pipeline的背后 ## 介绍 将不同的转换器和预测器链接在一起的能力是任何机器学习(ML)库的一个重要特性。在FlinkML中,我们希望提供直观的API,同时利用Scala语言的功能来实现pipelines的类型安全。我们希望实现的是一个易于使用的API,保护用户类型错误在飞行前(job启动之前),从而避免了将那些长时间运行的作业提交到集群,却只是看到它们由于ML pipeline中经常发生的一系列数据转换中的某些错误而失败。 在本指南中,我们将描述在FlinkML中实现可链转换器和预测器时所做的选择,并提供开发人员如何创建自己的算法来使用这些功能的指南。 ## 什么与为什么 那么"ML pipelines"是什么意思呢?pipelines在ML上下文可以被认为是一条链的操作,包含数据输入,执行一系列的数据转换 ,然后数据输出,要么被用作输入(特性)的预测功能,比如学习模式,或者只是输出转换后的数据来用于其他任务。最终学习者当然也可以成为pipelines的一部分。ML pipelines通常是复杂的操作集([深入解释](http://research.google.com/pubs/pub43146.html)),并且可能成为端到端学习系统的错误之源。 然后, ML pipeline 的目的是创建一个框架,用于管理这些操作链带来的复杂性。pipelines应该使开发人员更容易定义可应用于训练数据的链式转换,以便创建将用于培训学习模型的最终特性,然后执行相同的转换集,就像对未标记(测试)数据一样容易。pipelines还应该简化这些操作链上的交叉验证和模型选择。 最后,通过确保pipeline链中的连续链接“组合在一起”,我们还避免了代价高昂的类型错误。由于pipeline中的每个步骤都可能是一个计算量很大的操作,所以我们希望避免运行pipeline作业,除非我们确保pipeline中的所有输入/输出对都“匹配”。 ## pipelines在FlinkML FlinkML中pipeline的构建块可以在“ml.pipeline”中找到。FlinkML遵循了一个受[sklearn](http://scikit-learn.org)启发的API,这意味着我们有“Estimator”、“Transformer”和“Predictor”接口。要深入了解sklearn API的设计,感兴趣的读者可以参考[这里](http://arxiv.org/abs/1309.0238)。简而言之,“Estimator”是“Transformer”和“prediction”继承的基类。‘Estimator’定义了‘fit’方法,‘Transformer’也定义了‘transform’方法,‘predicator’定义了‘predict’方法。 “Estimator”的“fit”方法执行模型的实际训练,例如在线性回归任务中找到正确的权重,或者在特征定标器中找到数据的均值和标准差。顾名思义,实现“Transformer”的类是转换操作,如[缩放输入](standard_scaler.html)和“Predictor”实现是学习算法,如[多元线性回归](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/ml/multiple_linear_regression.html)。pipeline可以通过将多个转换器链接在一起来创建,pipeline中的最后一个链接可以是一个预测器或另一个转换器。以预测器结束的pipeline不能进一步链接。下面是一个pipeline如何形成的例子: ``` // Training data val input: DataSet[LabeledVector] = ... // Test data val unlabeled: DataSet[Vector] = ... val scaler = StandardScaler() val polyFeatures = PolynomialFeatures() val mlr = MultipleLinearRegression() // Construct the pipeline val pipeline = scaler .chainTransformer(polyFeatures) .chainPredictor(mlr) // Train the pipeline (scaler and multiple linear regression) pipeline.fit(input) // Calculate predictions for the testing data val predictions: DataSet[LabeledVector] = pipeline.predict(unlabeled) ``` 如前所述,FlinkML pipelines是类型安全的。如果我们试图将一个输出为'a'的转换器与另一个输入为'B'的转换器连接起来,那么在流转前如果'a' != 'B',就会出现错误。FlinkML通过使用Scala的隐式实现了这种类型安全。 ### Scala的隐式 如果您不熟悉Scala的隐式,我们可以推荐Martin Odersky的《Scala编程》中的[这段摘录](https://www.artima.com/pins1ed/implicitconversis-andparameters.html)。简而言之,隐式转换通过提供从一种类型到另一种类型的转换,从而支持Scala中的特殊多态性,隐式值为编译器提供的默认值,这些默认值可以通过隐式参数提供给函数调用。隐式转换和隐式参数的组合允许我们以类型安全的方式将转换和预测操作链接在一起。 ### 操作 如前所述,特性的(抽象类)“Estimator”定义了一个“fit”方法。该方法有两个参数列表(即是一个[curried函数](http://docs.scala-lang.org/tutorials/tour/currying.html))。第一个参数列表接受输入(训练)'数据集'和估计器的参数。第二个参数列表接受一个“隐式”参数,类型为“FitOperation”。“FitOperation”是一个类,它还定义了一个“fit”方法,这就是训练具体估计器的实际逻辑应该实现的地方。“Estimator”的“fit”方法本质上是“FitOperation”的“fit”方法的包装。“predict”的“预测”方法和“transform”的“变换”方法采用类似的方法设计,具有各自的操作类。 在这些方法中,操作对象作为隐式参数提供。Scala将在类型的伙伴对象中[寻找隐式](http://docs.scala-lang.org/tutorials/FAQ/finding-implicits.html),因此实现这些接口的类应该在伙伴对象中提供这些对象作为隐式对象。 作为一个例子,我们可以看看“StandardScaler”类。'StandardScaler'扩展了'Transformer',因此它可以访问'fit'和'transform'函数。这两个函数期望“FitOperation”和“TransformOperation”对象分别作为“fit”和“transform”方法的隐式参数,而“StandardScaler”通过“transformVectors”和“fitVectorStandardScaler”在它的伙伴对象中提供了这两个方法: ``` class StandardScaler extends Transformer[StandardScaler] { ... } object StandardScaler { ... implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] { override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T]) : Unit = { ... } implicit def transformVectors[T <: Vector: VectorConverter: TypeInformation: ClassTag] = { new TransformOperation[StandardScaler, T, T] { override def transform( instance: StandardScaler, transformParameters: ParameterMap, input: DataSet[T]) : DataSet[T] = { ... } } ``` 注意,“StandardScaler”**不**覆盖“Estimator”的“fit”方法或“Transformer”的“transform”方法。相反,它对“FitOperation”和“TransformOperation”的实现覆盖了它们各自的“fit”和“transform”方法,然后由“Estimator”和“Transformer”的“fit”和“transform”方法调用这些方法。类似地,实现“Predictor”的类应该在其同伴对象中定义一个隐式的“PredictOperation”对象。 #### 类型及类型安全 除了上面列出的'fit'和'transform'操作外,'StandardScaler'还为'LabeledVector'类型的输入提供'fit'和'transform'操作。这允许我们对标记或未标记的输入使用算法,这是自动发生的,取决于我们给fit和transform操作输入的类型。编译器根据输入类型选择正确的隐式操作。 如果我们尝试使用不支持的类型调用“fit”或“transform”方法,我们将在作业启动之前得到一个运行时错误。虽然也有可能在编译时捕获这类错误,但是我们能够提供给用户的错误消息信息量要少得多,这就是为什么我们选择抛出运行时异常。 ### 链接 链接是通过在实现“Transformer”的类的对象上调用“chainTransformer”或“chainPredictor”来实现的。这些方法分别返回一个“ChainedTransformer”或“ChainedPredictor”对象。正如我们提到的,“ChainedTransformer”对象可以被进一步链接,而“chainedprediction”对象不能。这些类负责对一对连续变压器或变压器和预测器应用拟合、转换和预测操作。如果链的长度大于2,它们也会递归地起作用,因为每个“ChainedTransformer”都定义了一个“transform”和“fit”操作,可以用更多的变压器或预测器进一步链接这些操作。 需要注意的是,开发人员和用户在实现算法时不需要担心链接,所有这些都是由FlinkML自动处理的。 ### 如何实现Pipeline操作 为了支持FlinkML的pipeline,算法必须遵循一定的设计模式,我们将在本节中对此进行描述。假设我们要实现一个管道操作符,它可以更改数据的平均值。由于聚合数据是许多分析pipeline中常用的预处理步骤,我们将其作为“Transformer”来实现。因此,我们首先创建一个“MeanTransformer”类,它继承自“Transformer” ``` class MeanTransformer extends Transformer[MeanTransformer] {} ``` 由于我们希望能够配置结果数据的平均值,所以必须添加一个配置参数。 ``` class MeanTransformer extends Transformer[MeanTransformer] { def setMean(mean: Double): this.type = { parameters.add(MeanTransformer.Mean, mean) this } } object MeanTransformer { case object Mean extends Parameter[Double] { override val defaultValue: Option[Double] = Some(0.0) } def apply(): MeanTransformer = new MeanTransformer } ``` 参数在transformer类的伙伴对象中定义,并扩展“Parameter”类。由于参数实例应该充当参数映射的不可变键,所以它们应该实现为“case对象”。如果此组件的用户没有设置其他值,则将使用默认值。如果没有指定默认值,即' defaultValue = None ',那么算法必须相应地处理这种情况。 我们现在可以实例化一个' MeanTransformer '对象并设置转换后数据的平均值。但是我们仍然必须实现转换的工作方式。工作流可以分为两个阶段。在第一阶段,转换器学习给定训练数据的平均值。然后,可以在第二阶段使用该知识将提供的数据与配置的结果平均值进行转换。 均值的学习可以在我们的“Transformer”的“fit”操作中实现,它是从“Estimator”继承而来的。在“fit”操作中,pipeline组件根据给定的训练数据进行训练。然而,该算法不是通过覆盖“fit”方法来实现的,而是通过为正确的类型提供相应的“FitOperation”的实现来实现的。查看一下“Estimator”中“fit”方法的定义,它是“Transformer”的父类,揭示了为什么会出现这种情况。 ``` trait Estimator[Self] extends WithParameters with Serializable { that: Self => def fit[Training]( training: DataSet[Training], fitParameters: ParameterMap = ParameterMap.Empty) (implicit fitOperation: FitOperation[Self, Training]): Unit = { FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment) fitOperation.fit(this, fitParameters, training) } } ``` 我们看到,“fit”方法使用类型为“Training”的输入数据集、可选参数列表和类型为“FitOperation”的隐式参数列表调用。在函数体中,首先注册一些机器学习类型,然后调用“FitOperation”参数的“fit”方法。实例将自己、参数映射和训练数据集作为方法的参数。因此,所有的程序逻辑都发生在“FitOperation”中。 “FitOperation”有两个类型参数。第一个定义pipeline操作符类型,这个“FitOperation”将为其工作,第二个类型参数定义数据集元素的类型。如果我们首先想实现“MeanTransformer”来在“DenseVector”上工作,那么我们必须为“FitOperation[MeanTransformer, DenseVector]”提供一个实现。 ``` val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { import org.apache.flink.ml.math.Breeze._ val meanTrainingData: DataSet[DenseVector] = input .map{ x => (x.asBreeze, 1) } .reduce{ (left, right) => (left._1 + right._1, left._2 + right._2) } .map{ p => (p._1/p._2).fromBreeze } } } ``` “FitOperation[T, I]”有一个“fit”方法,该方法通过类型为“T”的实例、一个参数映射和一个输入“DataSet[I]”来调用。在我们的例子中,'T=MeanTransformer'和'I=DenseVector'。如果我们的拟合步骤依赖于一些在“Transformer”创建时没有直接给出的参数值,则需要参数映射。“MeanTransformer”的“FitOperation”对给定输入数据集的“DenseVector”实例进行求和,并将结果除以向量的总数。这样,我们就得到了一个'DataSet[DenseVector]',其中只有一个元素,即平均值。 但是如果我们仔细观察实现,我们会发现平均值计算的结果从来没有存储在任何地方。如果我们想在以后的步骤中使用这些知识来调整其他一些输入的平均值,我们必须保持它不变。这里是“fit”方法的“MeanTransformer”类型的参数发挥作用的地方。我们可以使用这个实例来存储状态,它由后续的“transform”操作使用,该操作对相同的对象起作用。但首先我们必须通过一个成员字段扩展“MeanTransformer”,然后调整“FitOperation”实现。 ``` class MeanTransformer extends Transformer[Centering] { var meanOption: Option[DataSet[DenseVector]] = None def setMean(mean: Double): Mean = { parameters.add(MeanTransformer.Mean, mu) } } val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { import org.apache.flink.ml.math.Breeze._ instance.meanOption = Some(input .map{ x => (x.asBreeze, 1) } .reduce{ (left, right) => (left._1 + right._1, left._2 + right._2) } .map{ p => (p._1/p._2).fromBreeze }) } } ``` 如果我们查看“Transformer”中的“transform”方法,我们将看到我们还需要实现“TransformOperation”。一种可能的转换实现方法可能如下所示。 ``` val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] { override def transform( instance: MeanTransformer, transformParameters: ParameterMap, input: DataSet[DenseVector]) : DataSet[DenseVector] = { val resultingParameters = parameters ++ transformParameters val resultingMean = resultingParameters(MeanTransformer.Mean) instance.meanOption match { case Some(trainingMean) => { input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean, "trainingMean") } case None => throw new RuntimeException("MeanTransformer has not been fitted to data.") } } } class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector, DenseVector] { var trainingMean: DenseVector = null override def open(parameters: Configuration): Unit = { trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0) } override def map(vector: DenseVector): DenseVector = { import org.apache.flink.ml.math.Breeze._ val result = vector.asBreeze - trainingMean.asBreeze + resultingMean result.fromBreeze } } ``` 现在我们已经实现了所有的东西来将我们的“MeanTransformer”匹配到“DenseVector”实例的训练数据集,并对它们进行转换。然而,当我们执行“fit”操作时 ``` val trainingData: DataSet[DenseVector] = ... val meanTransformer = MeanTransformer() meanTransformer.fit(trainingData) ``` 我们在运行时收到以下错误:“没有为训练在数据集中的类MeanTransformer定义FitOperation [org.apache.flink.ml.math.DenseVector]”。原因是Scala编译器无法为“fit”方法的隐式参数找到具有正确类型参数的“FitOperation”值。因此,它选择了一个回退隐式值,该值在运行时给出此错误消息。为了让编译器知道我们的实现,我们必须将它定义为一个隐式的值,并将它放在“MeanTransformer”同伴对象的范围内。 ``` object MeanTransformer{ implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] ... implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] ... } ``` 现在我们可以调用'fit'和'transform'我们的'MeanTransformer'与'DataSet[DenseVector]'作为输入。此外,我们现在可以使用这个转换器作为分析pipeline的一部分,其中我们有一个“DenseVector”作为输入和预期输出。 ``` val trainingData: DataSet[DenseVector] = ... val mean = MeanTransformer.setMean(1.0) val polyFeatures = PolynomialFeatures().setDegree(3) val pipeline = mean.chainTransformer(polyFeatures) pipeline.fit(trainingData) ``` 值得注意的是,启用链接不需要额外的代码。系统使用各个组件的操作自动构造pipeline逻辑。 到目前为止,一切与“DenseVector”工作良好。但是,如果我们用'LabeledVector'调用转换器,会发生什么呢? ``` val trainingData: DataSet[LabeledVector] = ... val mean = MeanTransformer() mean.fit(trainingData) ``` 和前面一样,我们在执行程序时看到了下面的异常:“There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]”。值得注意的是,这个异常是在流转前阶段抛出的,这意味着作业还没有提交到运行时系统。这样做的好处是,不会看到一个作业运行了几天,然后由于不兼容的pipeline组件而失败。因此,对于整个job,需要在一开始就要检查类型兼容性。 为了使“MeanTransformer”也能在“LabeledVector”上工作,我们必须提供相应的操作。因此,我们必须定义一个“FitOperation[MeanTransformer, LabeledVector]”和“TransformOperation[MeanTransformer, LabeledVector, LabeledVector]”作为“MeanTransformer”的伙伴对象范围内的隐式值。 ``` object MeanTransformer { implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector] ... implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer, LabeledVector, LabeledVector] ... } ``` 如果我们想实现一个“Predictor”而不是“Transformer”,那么我们还必须提供一个“FitOperation”。此外,一个“Predictor”需要一个“PredictOperation”,它实现了如何从测试数据计算预测值。