提交 fabbad5e 编写于 作者: W wizardforcel

2021-03-08 21:42:42

上级 88fabc55
......@@ -640,7 +640,7 @@ print (DoSomething(SquareIt, 3))
# Lambda 函数-函数式编程
lambda 函数的概念是另外一种类似于 Python 的事情,在其他语言中可能看不到,这就是**函数式编程**。 这个想法是您可以在函数中包含一个简单函数。 举个例子,这最有意义:
Lambda 函数的概念是另外一种类似于 Python 的事情,在其他语言中可能看不到,这就是**函数式编程**。 这个想法是您可以在函数中包含一个简单函数。 举个例子,这最有意义:
```py
#Lambda functions let you inline simple functions
......@@ -657,7 +657,7 @@ print (DoSomething(lambda x: x * x * x, 3))
我们将打印`DoSomething`,并记住我们的第一个参数是一个函数,因此,除了传递命名函数外,我还可以使用`lambda`关键字内联声明此函数。 Lambda 基本上意味着我正在定义一个仅存在的未命名函数。 它是暂时的,它采用参数`x`。 在这里的语法中,`lambda`表示我正在定义某种内联函数,然后定义其参数列表。 它只有一个参数`x`和冒号,其后是该函数实际执行的操作。 我将使用`x`参数,并将其自身乘以三倍,以基本获得参数的立方。
在此示例中,`DoSomething`将作为第一个参数传入此 lambda 函数,该函数将计算`x``3`参数的立方。 那么,这实际上是在做什么呢? 该`lambda`函数本身就是一个函数,该函数在上一示例中传递到`DoSomething`中的`f`中,此处的`x`将是`3`。 这将返回`x``f`,最终将对值`3`执行我们的 lambda 函数。 这样`3`进入我们的`x`参数,然后我们的 lambda 函数将其转换为`3`乘以`3`乘以`3`,当然也就是`27`
在此示例中,`DoSomething`将作为第一个参数传入此 Lambda 函数,该函数将计算`x``3`参数的立方。 那么,这实际上是在做什么呢? 该`lambda`函数本身就是一个函数,该函数在上一示例中传递到`DoSomething`中的`f`中,此处的`x`将是`3`。 这将返回`x``f`,最终将对值`3`执行我们的 Lambda 函数。 这样`3`进入我们的`x`参数,然后我们的 Lambda 函数将其转换为`3`乘以`3`乘以`3`,当然也就是`27`
现在,当我们开始进行 MapReduce 和 Spark 之类的工作时,就会遇到很多问题。 因此,如果以后要处理 Hadoop 的各种技术,这是一个非常重要的概念。 再次,我鼓励您花些时间让它沉入其中,并根据需要了解发生了什么。
......
......@@ -352,9 +352,9 @@ rdd.map(lambda x: x*x)
```
假设我只是从列表 1、2、3、4 中创建了一个 RDD,然后可以使用`x`lambda 函数调用`rdd.map()`,该函数在每一行(即该 RDD 的每个值)中都称为`x`, 然后将函数`x`乘以`x`平方。 如果然后我要收集该 RDD 的输出,它将是 1、4、9 和 16,因为它将采用该 RDD 的每个单独的条目并将其平方,然后将其放入新的 RDD 中。
假设我只是从列表 1、2、3、4 中创建了一个 RDD,然后可以使用`x`Lambda 函数调用`rdd.map()`,该函数在每一行(即该 RDD 的每个值)中都称为`x`, 然后将函数`x`乘以`x`平方。 如果然后我要收集该 RDD 的输出,它将是 1、4、9 和 16,因为它将采用该 RDD 的每个单独的条目并将其平方,然后将其放入新的 RDD 中。
如果您不记得什么是 lambda 函数,我们在本书的前面部分就已经讨论过了,但是作为回顾,lambda 函数只是在线定义函数的简写。 因此`rdd.map(lambda x: x*x)`与单独的函数`def squareIt(x): return x*x`完全相同,并且说`rdd.map(squareIt)`
如果您不记得什么是 Lambda 函数,我们在本书的前面部分就已经讨论过了,但是作为回顾,Lambda 函数只是在线定义函数的简写。 因此`rdd.map(lambda x: x*x)`与单独的函数`def squareIt(x): return x*x`完全相同,并且说`rdd.map(squareIt)`
这只是您要作为转换传递的非常简单的函数的简写。 它消除了实际将其声明为自己的单独命名函数的需要。 这就是函数编程的全部思想。 因此,可以说您现在已经了解函数式编程! 但实际上,这只是用于将内联函数定义为`map()`函数的参数或任何相关转换的简写形式。
......@@ -543,7 +543,7 @@ rawData = rawData.filter(lambda x:x != header)
现在,我将使用`first`函数从该 RDD 中提取第一行,即第一行。 因此,现在标题 RDD 将包含一个条目,该条目仅是列标题的这一行。 现在,看看上面的代码中发生了什么,我在包含该 CSV 文件中所有信息的原始数据上使用`filter`,并且正在定义`filter`函数,该函数将仅允许行通过 如果该行不等于该初始标题行的内容。 我在这里所做的是,我已经获取了原始 CSV 文件,并通过仅允许与第一行不相等的行继续存在而删除了第一行,然后将其返回给`rawData` RDD 变量再次。 因此,我将使用`rawData`,过滤掉第一行,并创建一个仅包含数据本身的新`rawData`。 到目前为止和我在一起? 没那么复杂。
现在,我们将使用`map`函数。 接下来,我们需要开始从此信息中构造出更多结构。 现在,我的 RDD 的每一行都只是一行文本,它是逗号分隔的文本,但是它仍然只是一个巨大的文本行,我想采用逗号分隔的值列表并将其实际拆分成单个文本 领域。 归根结底,我希望将每个 RDD 从一行包含一堆用逗号分隔的信息的文本行转换为 Python 列表,该列表对我拥有的每列信息都有实际的单独字段。 因此,这就是 lambda 函数的作用:
现在,我们将使用`map`函数。 接下来,我们需要开始从此信息中构造出更多结构。 现在,我的 RDD 的每一行都只是一行文本,它是逗号分隔的文本,但是它仍然只是一个巨大的文本行,我想采用逗号分隔的值列表并将其实际拆分成单个文本 领域。 归根结底,我希望将每个 RDD 从一行包含一堆用逗号分隔的信息的文本行转换为 Python 列表,该列表对我拥有的每列信息都有实际的单独字段。 因此,这就是 Lambda 函数的作用:
```py
csvData = rawData.map(lambda x: x.split(","))
......@@ -552,7 +552,7 @@ csvData = rawData.map(lambda x: x.split(","))
它调用内置的 Python 函数`split`,该函数将接受一行输入,并将其分割为逗号字符,然后将其分成每个用逗号分隔的字段的列表。
这个`map`函数的输出是一个新的 RDD,称为`csvData`,我在其中传递了一个 lambda 函数,该函数仅基于逗号将每一行拆分为多个字段。 而且,此时`csvData`是一个 RDD,它在每一行上包含一个列表,其中每个元素都是源数据中的一列。 现在,我们越来越近了。
这个`map`函数的输出是一个新的 RDD,称为`csvData`,我在其中传递了一个 Lambda 函数,该函数仅基于逗号将每一行拆分为多个字段。 而且,此时`csvData`是一个 RDD,它在每一行上包含一个列表,其中每个元素都是源数据中的一列。 现在,我们越来越近了。
事实证明,为了将决策树与 MLlib 一起使用,需要做两件事。 首先,输入必须采用`LabeledPoint`数据类型的形式,并且本质上必须全部为数字。 因此,我们需要将所有原始数据转换为 MLlib 实际可以使用的数据,这就是我们先前跳过的`createLabeledPoints`函数所做的。 我们将在一秒钟内解决这个问题,首先是它的调用:
......@@ -772,7 +772,7 @@ clusters = KMeans.train(data, K, maxIterations=10,
好了,现在我们可以使用该集群了。
让我们开始打印出每个点的聚类分配。 因此,我们将获取原始数据并使用 lambda 函数对其进行转换:
让我们开始打印出每个点的聚类分配。 因此,我们将获取原始数据并使用 Lambda 函数对其进行转换:
```py
resultRDD = data.map(lambda point: clusters.predict(point)).cache()
......@@ -814,9 +814,9 @@ print("Within Set Sum of Squared Error = " + str(WSSSE))
```
首先,我们定义此`error`函数,该函数计算每个点的平方误差。 它只是计算从点到每个聚类的质心中心的距离并将其求和。 为此,我们获取源数据,在其上调用一个 lambda 函数,该函数实际上计算每个质心中心点的误差,然后可以在此处将不同的操作链接在一起。
首先,我们定义此`error`函数,该函数计算每个点的平方误差。 它只是计算从点到每个聚类的质心中心的距离并将其求和。 为此,我们获取源数据,在其上调用一个 Lambda 函数,该函数实际上计算每个质心中心点的误差,然后可以在此处将不同的操作链接在一起。
首先,我们调用`map`计算每个点的误差。 然后,为了得到代表整个数据集的最终总数,我们在该结果上调用`reduce`。 因此,我们正在执行`data.map`以计算每个点的误差,然后执行`reduce`以获取所有这些误差并将它们加在一起。 这就是 lambda 小功能的作用。 这基本上是一种幻想的表达方式,“我希望您将这个 RDD 中的所有内容加起来成为一个最终结果。” `reduce`会同时提取整个 RDD,一次两件事,然后使用您提供的任何功能将它们组合在一起。 我上面提供的功能是“将要合并的两行合并起来。”
首先,我们调用`map`计算每个点的误差。 然后,为了得到代表整个数据集的最终总数,我们在该结果上调用`reduce`。 因此,我们正在执行`data.map`以计算每个点的误差,然后执行`reduce`以获取所有这些误差并将它们加在一起。 这就是 Lambda 小功能的作用。 这基本上是一种幻想的表达方式,“我希望您将这个 RDD 中的所有内容加起来成为一个最终结果。” `reduce`会同时提取整个 RDD,一次两件事,然后使用您提供的任何功能将它们组合在一起。 我上面提供的功能是“将要合并的两行合并起来。”
如果我们在 RDD 的每个条目中都这样做,那么最终将得出最终的总计。 总结一堆值似乎有些费解,但是通过这种方式,我们可以确保我们可以根据需要实际分发此操作。 实际上,我们最终可以在一台计算机上计算一个数据的总和,然后在另一台计算机上计算另一数据的总和,然后将这两个总和相结合,以将它们组合在一起成为最终结果。 此`reduce`函数表示,如何从此操作中获取任何两个中间结果,并将它们组合在一起?
......
......@@ -100,7 +100,7 @@ SciPy 软件包中的`binom`函数有助于生成二项式分布以及与之相
## 泊松分布
泊松分布是间隔中独立间隔出现的概率分布。 二项式分布用于确定二进制出现的概率,而泊松分布用于基于计数的分布。 如果 lambda 是每个时间间隔内事件的平均发生率,则在给定的时间间隔内发生`k`的可能性由以下公式给出:
泊松分布是间隔中独立间隔出现的概率分布。 二项式分布用于确定二进制出现的概率,而泊松分布用于基于计数的分布。 如果 Lambda 是每个时间间隔内事件的平均发生率,则在给定的时间间隔内发生`k`的可能性由以下公式给出:
![A Poisson distribution](img/3450_02_10.jpg)
......
......@@ -596,7 +596,7 @@ In: grouped_targets_var = iris.groupby(['target']).var()
![](img/d89cc76a-fb61-43b0-a558-a8c9840a65e2.png)
由于您可能需要对每个变量进行多个统计,因此您可以直接使用`agg`方法,并针对每个变量应用特定的功能,而不是通过串联将多个聚合的数据集放在一起。 您可以通过字典定义变量,字典中的键是变量标签,值是要应用的函数的列表–由字符串(例如`'mean'``'std'``'min'``'max'``'sum'``'prod'`)或通过当场声明的预定义函数甚至是 lambda 函数:
由于您可能需要对每个变量进行多个统计,因此您可以直接使用`agg`方法,并针对每个变量应用特定的功能,而不是通过串联将多个聚合的数据集放在一起。 您可以通过字典定义变量,字典中的键是变量标签,值是要应用的函数的列表–由字符串(例如`'mean'``'std'``'min'``'max'``'sum'``'prod'`)或通过当场声明的预定义函数甚至是 Lambda 函数:
```py
In: funcs = {'sepal_length': ['mean','std'],
......@@ -631,7 +631,7 @@ In: median_time_series = pd.rolling_median(time_series, 5)
相反,可以执行此操作以获得值的滚动中值。 在这两种情况下,窗口的大小均为 5。
更一般而言,`apply()` Pandas 方法能够以编程方式执行任何按行或按列的操作。 `apply()`应该直接在`DataFrame`上调用; 第一个参数是按行或按列应用的函数; 第二个参数是对其应用轴。 请注意,该函数可以是内置的,库提供的,lambda 或任何其他用户定义的函数。
更一般而言,`apply()` Pandas 方法能够以编程方式执行任何按行或按列的操作。 `apply()`应该直接在`DataFrame`上调用; 第一个参数是按行或按列应用的函数; 第二个参数是对其应用轴。 请注意,该函数可以是内置的,库提供的,Lambda 或任何其他用户定义的函数。
作为此强大方法的示例,让我们尝试计算每行中有多少个非零元素。 使用`apply`方法,这很简单:
......
......@@ -94,7 +94,7 @@
* 数据可视化
* 部署到实时仪表板
为了进行试验,第一个实现是一个简单的 Python 应用程序,该应用程序使用 [tweepy 库](https://pypi.python.org/pypi/tweepy)(Python 的官方 Twitter 库)连接到 Twitter 并获得一系列 tweet [textblob](https://pypi.python.org/pypi/textblob)(用于基本 NLP 的简单 Python 库),以丰富情感分析。
为了进行试验,第一个实现是一个简单的 Python 应用程序,该应用程序使用 [tweepy 库](https://pypi.python.org/pypi/tweepy)(Python 的官方 Twitter 库)连接到 Twitter 并获得一系列推文[textblob](https://pypi.python.org/pypi/textblob)(用于基本 NLP 的简单 Python 库),以丰富情感分析。
然后将结果保存到 JSON 文件中进行分析。 这个原型是使事情开始并快速进行实验的好方法,但是经过几次迭代,我们很快意识到我们需要认真对待并构建满足企业需求的体系结构。
......
......@@ -18,7 +18,7 @@
# 为什么选择 Python?
像许多开发人员一样,当用于构建数据密集型项目时,使用 Python 并不是我的首选。 老实说,在 Java 领域已经工作了很多年,尽管学习曲线相当陡峭,但 Scala 最初对我来说更具吸引力。 Scala 是一种非常强大的语言,完美地结合了 Java 所欠缺的面向对象和函数式编程(直到 Java 8 开始引入 lambda 表达式为止)。
像许多开发人员一样,当用于构建数据密集型项目时,使用 Python 并不是我的首选。 老实说,在 Java 领域已经工作了很多年,尽管学习曲线相当陡峭,但 Scala 最初对我来说更具吸引力。 Scala 是一种非常强大的语言,完美地结合了 Java 所欠缺的面向对象和函数式编程(直到 Java 8 开始引入 Lambda 表达式为止)。
Scala 还提供了一种非常简洁的语法,可以将其转换为更少的代码行,更高的生产率并最终减少错误。 这非常方便,尤其是当您大部分工作是操纵数据时。 喜欢 Scala 的另一个原因是,在使用大数据框架(例如 Apache Spark)时,它们具有更好的 API 覆盖范围,这些框架本身是用 Scala 编写的。 还有很多其他理由更喜欢 Scala,例如它是一个强大的类型化系统,并且它与 Java 互操作性,在线文档和高性能。
......
......@@ -1244,7 +1244,7 @@ JavaScript 代码由`pixiedust_node`魔术处理,并发送到 Node 子进程
![Run Node.js inside a Python Notebook](img/00106.jpeg)
反向也相同。 以下代码首先在节点单元格中的 JavaScript 中创建一个 JSON 变量,然后在 Python 单元格中创建并显示 Pandas DataFrame
反向也相同。 以下代码首先在节点单元格中的 JavaScript 中创建一个 JSON 变量,然后在 Python 单元格中创建并显示 Pandas `DataFrame`
```py
%%node
......
......@@ -234,7 +234,7 @@ auth.set_access_token(access_token, access_token_secret)
### 注意
为简单起见,该实现不包括在处理完文件后清理文件。 此实现的另一个次要限制是,我们当前要等到缓冲区被填满后才能写入文件,如果没有新的 tweet 出现,理论上可能会花费很长时间。
为简单起见,该实现不包括在处理完文件后清理文件。 此实现的另一个次要限制是,我们当前要等到缓冲区被填满后才能写入文件,如果没有新的推文出现,理论上可能会花费很长时间。
`RawTweetsListener`的代码如下所示:
......@@ -328,7 +328,7 @@ class RawTweetsListener(StreamListener):
[您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode3.py)。
* `field_metadata`包含 Spark 数据类型,我们稍后将在创建 Spark 流查询时使用它来构建架构。
* `field_metadata`还包含一个可选的变换`lambda`函数,用于在将值写入磁盘之前清除该值。 作为参考,Python 中的 lambda 函数是内联定义的匿名函数(请参见[这个页面](https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions))。 我们将其用于通常作为 HTML 片段返回的源字段。 在此 lambda 函数中,我们使用 BeautifulSoup 库(在上一章中也使用过)仅提取文本,如以下代码片段所示:
* `field_metadata`还包含一个可选的变换`lambda`函数,用于在将值写入磁盘之前清除该值。 作为参考,Python 中的 Lambda 函数是内联定义的匿名函数(请参见[这个页面](https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions))。 我们将其用于通常作为 HTML 片段返回的源字段。 在此 Lambda 函数中,我们使用 BeautifulSoup 库(在上一章中也使用过)仅提取文本,如以下代码片段所示:
```py
lambda s: BS(s, "html.parser").text.strip()
......@@ -373,7 +373,7 @@ stream.disconnect()
**注**:Spark 结构化流支持多种类型的输入源,包括文件,Kafka,套接字和速率。 (Socket 和 Rate 均仅用于测试。)
下图取自 Spark 网站,并很好地解释了如何将新数据附加到 Streaming DataFrame
下图取自 Spark 网站,并很好地解释了如何将新数据附加到 Streaming `DataFrame`
![Creating a Spark Streaming DataFrame](img/00149.jpeg)
......@@ -383,7 +383,7 @@ stream.disconnect()
Spark Streaming Python API 提供了一种优雅的方法,可以使用`spark.readStream`属性创建 Streaming `DataFrame`,该属性创建一个新的`pyspark.sql.streamingreamReader`对象,该对象方便地使您链接方法调用,并具有创建更清晰代码的额外好处(请参见[这个页面](https://en.wikipedia.org/wiki/Method_chaining),以获取有关此模式的更多详细信息)。
例如,要创建 CSV 文件流,我们使用`csv`调用 format 方法,链接适用的选项,然后使用目录路径调用`load`方法:
例如,要创建 CSV 文件流,我们使用`csv`调用`format`方法,链接适用的选项,然后使用目录路径调用`load`方法:
```py
schema = StructType(
......@@ -403,7 +403,7 @@ csv_sdf = spark.readStream\
[您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode5.py)
`spark.readStream`还提供了一种方便的高级`csv`方法,该方法将 path 作为选项的第一个参数和关键字参数:
`spark.readStream`还提供了一种方便的高级`csv`方法,该方法将路径作为选项的第一个参数和关键字参数:
```py
csv_sdf = spark.readStream \
......@@ -778,7 +778,7 @@ def enrich(self, data):
[您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode12.py)。
然后将结果存储在`data`对象中,该对象将被写入 CSV 文件。 我们还防止意外的异常跳过当前的 tweet 并记录警告消息,而不是让异常冒泡,这会阻止 Twitter 流。
然后将结果存储在`data`对象中,该对象将被写入 CSV 文件。 我们还防止意外的异常跳过当前的推文并记录警告消息,而不是让异常冒泡,这会阻止 Twitter 流。
### 注意
......@@ -884,7 +884,7 @@ Batch: 2
+----------+---------------+---------------+---------+------------+-------------+
```
最后,我们使用 Parquet `output`接收器运行结构化查询,创建一个批量`DataFrame`,并使用 PixieDust `display()`浏览数据以显示例如按情感分类的 tweets 数(`positive`,`negative`,`neutral`)由实体聚类,如下图所示:
最后,我们使用 Parquet `output`接收器运行结构化查询,创建一个批量`DataFrame`,并使用 PixieDust `display()`浏览数据以显示例如按情感分类的推文数(`positive`,`negative`,`neutral`)由实体聚类,如下图所示:
![Getting started with the IBM Watson Natural Language Understanding service](img/00154.jpeg)
......@@ -1136,7 +1136,7 @@ print("Number of tweets received: {}".format(streams_manager.twitter_stream.list
从前面的代码中有多个注意事项:
* 当我们尝试加载`parquet_df`批量`DataFrame`时,Parquet 文件的输出目录可能未准备好,这会导致异常。 为了解决此时序问题,我们将代码包装到`try...except`语句中,并使用`time.sleep(5)`等待 5 秒钟。
* 我们还将在标题中显示当前的推文计数。 为此,我们添加一个`<div>`元素,该元素每 5 秒刷新一次,并添加一个`<pd_script>`,该元素使用`streams_manager.twitter_stream.listener.tweet_count`来打印当前的 tweet 计数,该变量是我们添加到 `RawTweetsListener`类。 我们还更新了`on_data()`方法,以在每次有新的 tweet 到达时增加`tweet_count`变量,如以下代码所示:
* 我们还将在标题中显示当前的推文计数。 为此,我们添加一个`<div>`元素,该元素每 5 秒刷新一次,并添加一个`<pd_script>`,该元素使用`streams_manager.twitter_stream.listener.tweet_count`来打印当前的推文计数,该变量是我们添加到 `RawTweetsListener`类。 我们还更新了`on_data()`方法,以在每次有新的推文到达时增加`tweet_count`变量,如以下代码所示:
```py
[[TweetInsightApp]]
......@@ -1317,7 +1317,7 @@ app.run()
[您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode26.py)。
示例应用程序的第 3 部分现已完成; 您可以在这里找到完整的 Notebook
示例应用程序的第 3 部分现已完成; 您可以在这里找到完整的笔记本
[### 注意](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%203.ipynb)。
......@@ -1497,7 +1497,7 @@ def on_data(self, data):
将 Streaming Analytics 服务与项目相关联
现在我们准备创建定义我们的 tweet 数据的丰富处理的流。
现在我们准备创建定义我们的推文数据的丰富处理的流。
我们转到**资产**选项卡,向下滚动到**流式流**部分,然后单击**新流式流**按钮。 在下一页中,我们提供一个名称,选择 Streaming Analytics 服务,手动选择,然后单击**创建**按钮。
......@@ -1505,7 +1505,7 @@ def on_data(self, data):
* 面板的“源”部分中的**消息中心:数据的输入源**。 进入画布后,我们将其重命名为`Source Message Hub`(通过双击它进入编辑模式)。
* 处理和分析部分的**代码**:它将包含调用 Watson NLU 服务的数据丰富 Python 代码。 我们将运算符重命名为`Enrichment`。
* 面板的 Targets 部分中的 **消息中心:丰富数据的输出源**。 我们将其重命名为`Target Message Hub`。
* 面板的目标部分中的 **消息中心:丰富数据的输出源**。 我们将其重命名为`Target Message Hub`。
接下来,我们在**源消息中心**和**扩展**之间以及**扩展**和**目标消息中心**之间创建连接。 要在两个操作员之间建立连接,只需抓住第一个操作员末端的输出端口并将其拖到另一个操作员的输入端口即可。 请注意,源操作员在框的右侧仅具有一个输出端口,以表示它仅支持传出连接,而目标操作员在左侧仅具有一个输入端口,以表示仅支持传入连接。 **处理和分析**部分中的任何操作员在左侧和右侧都有两个端口,因为它们都接受传入和传出连接。
......@@ -1556,7 +1556,7 @@ def init(state):
![Enriching the tweets data with the Streaming Analytics service](img/00163.jpeg)
watson_cloud_developer 包添加到流中
`watson_cloud_developer`包添加到流中
在每个事件数据上调用过程方法。 我们使用它来调用 Watson NLU 并将额外的信息添加到事件对象中,如以下代码所示:
......@@ -1619,7 +1619,7 @@ def process(event, state):
**注意**:您可能已经注意到,不同的系统对此选项使用不同的名称,这使其更易于出错。 幸运的是,如果拼写错误,将显示带有明确根本原因消息的异常。
前面的选项用于 Spark Streaming,我们仍然需要配置 Kafka 凭据,以便可以使用 Message Hub 服务对较低级别的 Kafka 使用者进行正确的身份验证。 为了将这些消费者属性正确地传递给 Kafka,我们不使用`.option`方法,而是创建一个`kafka_options`字典,并将其传递给 load 方法,如以下代码所示:
前面的选项用于 Spark Streaming,我们仍然需要配置 Kafka 凭据,以便可以使用 Message Hub 服务对较低级别的 Kafka 使用者进行正确的身份验证。 为了将这些消费者属性正确地传递给 Kafka,我们不使用`.option`方法,而是创建一个`kafka_options`字典,并将其传递给`load`方法,如以下代码所示:
```py
def start_streaming_dataframe():
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册