在本节中,我们介绍了 ML Pipelines 的概念。ML Pipelines 提供了构建在 DataFrames 之上的一系列上层 API,帮助用户创建和调整切合实际的 ML Pipelines.
目录
MLlib 将机器学习算法的API标准化,以便将多种算法更容易地组合成单个 Pipeline (管道)或者工作流。本节介绍Pipelines API 的关键概念,其中 Pipeline(管道)的概念主要是受到 scikit-learn 项目的启发.
Transformer
s 和 Estimator
s 绑在一起形成一个工作流.Transformer
s 和 Estimator
s 都已经使用标准的 API 来指定参数.
机器学习可以应用于各种各样的数据类型,比如向量,文本,图形和结构化数据.API 采用 Spark Sql 的 DataFrame 就是为了支持各种各样的数据类型.
DataFrame 支持许多基本的结构化的数据,参考Spark SQL datatype reference上的一系列支持的类型。另外除了 Spark SQL guide 列举的类型,DataFrame 还支持使用 ML Vector 类型.
DataFrame 可以用标准的 RDD 显式或者非显式创建。请参考下面的例子,或者前往 Spark SQL programming guide 查看例子
DataFrame 中的列是有名称的.下面的代码示例会使用名称如 "文本","特征"和"标签".
转换器是特征变换和机器学习模型的抽象。转换器必须实现transform方法,这个方法将一个 DataFrame 转换成另一个 DataFrame,通常是附加一个或者多个列。比如:
Estimators 模型学习器是拟合和训练数据的机器学习算法或者其他算法的抽象。技术上来说, Estimator(模型学习器)实现 fit() 方法,这个方法输入一个 DataFrame 并产生一个 Model 即一个 Transformer(转换器)。举个例子,一个机器学习算法是一个 Estimator 模型学习器,比如这个算法是 LogisticRegression(逻辑回归),调用 fit() 方法训练出一个 LogisticRegressionModel,这是一个 Model,因此也是一个 Transformer(转换器).
Transformer.transform() 和 Estimator.fit() 都是无状态。以后,可以通过替换概念来支持有状态算法.
每一个 Transformer(转换器)和 Estimator (模型学习器)都有一个唯一的ID,这在指定参数上非常有用(将在下面进行讨论).
在机器学习中,通常会执行一系列算法来处理和学习模型,比如,一个简单的文本文档处理流程可能包括这几个步骤:
MLlib 代表一个流水线,就是一个 Pipeline(管道),Pipeline(管道) 包含了一系列有特定顺序的管道步骤(Transformer
s(转换器) 和 Estimator
s(模型学习器))。我们将使用这个简单的工作流作为本节的运行示例.
一个 pipeline 由多个步骤组成,每一个步骤都是一个 Transformer(转换器)或者 Estimator(模型学习器)。这些步骤按顺序执行,输入的 DataFrame 在通过每个阶段时进行转换。在 Transformer (转换器)步骤中,DataFrame 会调用 transform() 方法;在 Estimator(模型学习器)步骤中,fit()
方法被调用并产生一个 Transformer(转换器)(会成为 PipelineModel(管道模型)的一部分,或者适配 Pipeline ),并且 DataFrame 会调用这个 Transformer
’s(转换器)的transform()方法.
我们通过简单的文本文档工作流来说明.
下图是显示了使用管道的训练流程
以上,顶部的一行代表 Pipeline(管道)有三个步骤。第一第二个(Tokenizer(分词器)和 HashingTF(词频))是 Transformer
s(转换器)(蓝色的),第三个是 Estimator(模型学习器)(红色的)。底部的一行表示流经管道的数据,其中柱面表示 DataFrame。最初的 DataFrame 有少量的文本文档和标签, 会调用 Pipeline.fit() 方法.Tokenizer.transform()方法将原始文本分割成单词,并将这些单词作为一列添加到 DataFrame。接下HashingTF.transform() 方法将单词列转换成特征向量,并向 DataFrame 添加带有这些向量的新列。现在,由于LogisticRegression(逻辑回归)是一个模型学习器,Pipeline 会首先调用 LogisticRegression.fit()
来生成一个LogisticRegressionModel,然后 DataFrame 再调用 LogisticRegressionModel
’s transform() 方法在 DataFrame传送到下一个步骤之前
。
同时 Pipeline 也是一个模型学习器。因此,Pipeline
’s fit() 运行完之后,会生成一个 PipelineModel,即一个Transformer(转换器),PipelineModel 是用来在测试的时候使用
。
下图说明了这种用法
在上图中,PipelineModel 有和原始的 Pipeline(管道)一样的步骤,但是在原始 Pipeline 所有的 Estimator
s (模型学习器)都会变成 Transformer
s(转换器)。当在测试集上调用 PipelineModel
’s transform() 方法时,数据会按顺序通过装配的管道。每个步骤的
transform()
方法都会更新数据集并将 DataFrame 传递到下一个步骤.
Pipeline 和 PipelineModel 有助于确保了训练集和测试集经过相同的处理步骤.
DAG(有向无环图)Pipeline
s:管道的步骤被定义为一个有序的数组。上面的例子是一个线性的管道即 Pipeline
s每个步骤使用的数据都产生于前一个步骤。只要数据流图能构成有向无环图,非线性 Pipelines 也可以创建出来。图当前是通过输每个步骤的输入输出列名来隐含指定的(通常指定为参数).如果 Pipeline 构成了一个 DAG,则按照拓扑顺序指定阶段.
Runtime checking(运行时检查):由于管道可以对具有不同类型的 DataFrames 进行操作,因此不能使用编译时类型检查。Pipeline
s 和PipelineModel
s 在真正运行 Pipeline 之前进行运行时检查。这个类型检查通过 DataFrame schema(描述了 DataFrame 列的数据类型)来完成的.
Unique Pipeline stages(唯一性管道阶段):一个管道的阶段必须是一个唯一实例,比如,相同的实例 myHashingTF不能插入到 Pipeline 两次,因为管道的阶段的 ID 必须是唯一的。但是,不同的实例 myHashingTF1 和myHashingTF2(都是 HashingTF 类型)可以放进相同的 Pipeline,只要通过不同的 ID 创建不同的实例.
MLib 的 Estimator
s(模型学习器)和 Transformer
s(转换器)使用统一的 API 来指定参数.
Param 是具有自包含定义的参数,ParamMap 是一组(参数,值)对.
将参数传递给算法主要有两种方式
lr.setMaxIter(10)
使 lr.fit()
最多10次迭代。这个 API 类似于 spark.mlib 包中的 API.fit()
函数和 transform()
函数。ParamMap 的任意参数将会覆盖前面调用实例通过 setter 方法指定的参数.参数属于特定的 Estimator
s(模型学习器)和 Transformer
s(转换器)的实例。例如,如果我们有两个 LogisticRegression 实例 lr1 和 lr2,然后我们可以使用指定的 maxIter 参数来构建ParamMap:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。如果 Pipeline 中存在两个 maxIter 参数的算法,这就非常有用.
通常情况下,将模型或管道保存到磁盘上以供将来使用是值得的。在 Spark 1.6,模型导入导出功能被加入到 Pipeline API。大多数基本的 transformers(转换器)是被支持的,包括一些更基本的ML Models。请根据算法 API 文档判断是否支持保存和加载.
该例子涉及到 Estimator(训练器)、Transformer(转换器)和 Param(参数)的概念
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.Row // Prepare training data from a list of (label, features) tuples. val training = spark.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features") // Create a LogisticRegression instance. This instance is an Estimator. val lr = new LogisticRegression() // Print out the parameters, documentation, and any default values. println("LogisticRegression parameters:\n" + lr.explainParams() + "\n") // We may set parameters using setter methods. lr.setMaxIter(10) .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. val model1 = lr.fit(training) // Since model1 is a Model (i.e., a Transformer produced by an Estimator), // we can view the parameters it used during fit(). // This prints the parameter (name: value) pairs, where names are unique IDs for this // LogisticRegression instance. println("Model 1 was fit using parameters: " + model1.parent.extractParamMap) // We may alternatively specify parameters using a ParamMap, // which supports several methods for specifying parameters. val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. // One can also combine ParamMaps. val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name. val paramMapCombined = paramMap ++ paramMap2 // Now learn a new model using the paramMapCombined parameters. // paramMapCombined overrides all parameters set earlier via lr.set* methods. val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap) // Prepare test data. val test = spark.createDataFrame(Seq( (1.0, Vectors.dense(-1.0, 1.5, 1.3)), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)) )).toDF("label", "features") // Make predictions on test data using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. // Note that model2.transform() outputs a 'myProbability' column instead of the usual // 'probability' column since we renamed the lr.probabilityCol parameter previously. model2.transform(test) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction") }
参考:
[`Estimator` Scala docs](api/scala/index.html#org.apache.spark.ml.Estimator),
[`Transformer` Scala docs](api/scala/index.html#org.apache.spark.ml.Transformer) 和
[`Params` Scala docs](api/scala/index.html#org.apache.spark.ml.param.Params)了解API的详细信息。
在Spark repo中的“examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala”中查找完整示例代码。
该示例遵循上述附图中所示的简单文本文档管道。
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row // Prepare training documents from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.001) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. val model = pipeline.fit(training) // Now we can optionally save the fitted pipeline to disk model.write.overwrite().save("/tmp/spark-logistic-regression-model") // We can also save this unfit pipeline to disk pipeline.write.overwrite().save("/tmp/unfit-lr-model") // And load it back in during production val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model") // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "spark hadoop spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
有关API的详细信息,请参阅[`Pipeline` Scala docs](api/scala/index.html#org.apache.spark.ml.Pipeline)
在Spark repo中的“examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala”中查找完整示例代码。