提交 7a339a65 编写于 作者: S Stephan Ewen

[FLINK-4959] [docs] Add documentation for ProcessFunction

上级 fdce1f31
......@@ -27,11 +27,47 @@ under the License.
* This will be replaced by the TOC
{:toc}
## Levels of Abstraction
Flink offers different levels of abstraction to develop streaming/batch applications.
<img src="../fig/levels_of_abstraction.svg" alt="Programming levels of abstraction" class="offset" width="80%" />
- The lowest level abstraction simply offers **stateful streaming**. It is embedded into the [DataStream API](../dev/datastream_api.html)
via the [Process Function](../dev/stream/process_function.html). It allows users freely process events from one or more streams,
and use consistent fault tolerant *state*. In addition, users can register event time and processing time callbacks,
allowing programs to realize sophisticated computations.
- In practice, most applications would not need the above described low level abstraction, but would instead program against the
**Core APIs** like the [DataStream API](../dev/datastream_api.html) (bounded/unbounded streams) and the [DataSet API](../dev/batch/index.html)
(bounded data sets). These fluent APIs offer the common building blocks for data processing, like various forms of user-specified
transformations, joins, aggregations, windows, state, etc. Data types processed in these APIs are represented as classes
in the respective programming languages.
The low level *Process Function* integrates with the *DataStream API*, making it possible to go the lower level abstraction
for certain operations only. The *DataSet API* offers additional primitives on bounded data sets, like loops/iterations.
- The **Table API** is a declarative DSL centered around *tables*, which may be dynamically changing tables (when representing streams).
The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases)
and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc.
Table API programs declaratively define *what logical operation should be done* rather than specifying exactly
*how the code for the operation looks*. Though the Table API is extensible by various types of user-defined
functions, it is less expressive than the *Core APIs*, but more concise to use (less code to write).
In addition, Table API programs also go through an optimizer that applies optimization rules before execution.
One can seamlessly convert between tables and *DataStream*/*DataSet*, allowing programs to mix *Table API* and with the *DataStream*
and *DataSet* APIs.
- The highest level abstraction offered by Flink is **SQL**. This abstraction is similar to the *Table API* both in semantics and
expressiveness, but represents programs as SQL query expressions.
The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the *Table API*.
## Programs and Dataflows
The basic building blocks of Flink programs are **streams** and **transformations**. (Note that the
DataSets used in Flink's batch API are also streams internally -- more about that
later.) Conceptually a *stream* is a never-ending flow of data records, and a *transformation* is an
DataSets used in Flink's DataSet API are also streams internally -- more about that
later.) Conceptually a *stream* is a (potentially never-ending) flow of data records, and a *transformation* is an
operation that takes one or more streams as input, and produces one or more output streams as a
result.
......@@ -40,7 +76,7 @@ Each dataflow starts with one or more **sources** and ends in one or more **sink
arbitrary **directed acyclic graphs** *(DAGs)*. Although special forms of cycles are permitted via
*iteration* constructs, for the most part we will gloss over this for simplicity.
<img src="{{ site.baseurl }}/fig/program_dataflow.svg" alt="A DataStream program, and its dataflow." class="offset" width="80%" />
<img src="../fig/program_dataflow.svg" alt="A DataStream program, and its dataflow." class="offset" width="80%" />
Often there is a one-to-one correspondence between the transformations in the programs and the operators
in the dataflow. Sometimes, however, one transformation may consist of multiple transformation operators.
......@@ -49,19 +85,15 @@ in the dataflow. Sometimes, however, one transformation may consist of multiple
## Parallel Dataflows
Programs in Flink are inherently parallel and distributed. This parallelism is expressed in Flink's
DataStream API with the *keyBy()* operator, which can be thought of as a declaration that the stream can
be operated on in parallel for different values of the key.
*Streams* are split into **stream partitions**, and *operators* are split into **operator
subtasks**. The operator subtasks are independent of one another, and execute in different threads
Programs in Flink are inherently parallel and distributed. During execution, a *stream* has one or more **stream partitions**,
and each *operator* has one or **operator subtasks**. The operator subtasks are independent of one another, and execute in different threads
and possibly on different machines or containers.
The number of operator subtasks is the **parallelism** of that particular operator. The parallelism of a stream
is always that of its producing operator. Different operators of the same program may have different
levels of parallelism.
<img src="{{ site.baseurl }}/fig/parallel_dataflow.svg" alt="A parallel dataflow" class="offset" width="80%" />
<img src="../fig/parallel_dataflow.svg" alt="A parallel dataflow" class="offset" width="80%" />
Streams can transport data between two operators in a *one-to-one* (or *forwarding*) pattern, or in a *redistributing* pattern:
......@@ -93,7 +125,7 @@ Windows can be *time driven* (example: every 30 seconds) or *data driven* (examp
One typically distinguishes different types of windows, such as *tumbling windows* (no overlap),
*sliding windows* (with overlap), and *session windows* (punctuated by a gap of inactivity).
<img src="{{ site.baseurl }}/fig/windows.svg" alt="Time- and Count Windows" class="offset" width="80%" />
<img src="../fig/windows.svg" alt="Time- and Count Windows" class="offset" width="80%" />
More window examples can be found in this [blog post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html).
......@@ -112,7 +144,7 @@ of time:
- **Processing Time** is the local time at each operator that performs a time-based operation.
<img src="{{ site.baseurl }}/fig/event_ingestion_processing_time.svg" alt="Event Time, Ingestion Time, and Processing Time" class="offset" width="80%" />
<img src="../fig/event_ingestion_processing_time.svg" alt="Event Time, Ingestion Time, and Processing Time" class="offset" width="80%" />
More details on how to handle time are in the [event time docs]({{ site.baseurl }}/dev/event_time.html).
......@@ -131,7 +163,7 @@ and is restricted to the values associated with the current event's key. Alignin
makes sure that all state updates are local operations, guaranteeing consistency without transaction overhead.
This alignment also allows Flink to redistribute the state and adjust the stream partitioning transparently.
<img src="{{ site.baseurl }}/fig/state_partitioning.svg" alt="State and Partitioning" class="offset" width="50%" />
<img src="../fig/state_partitioning.svg" alt="State and Partitioning" class="offset" width="50%" />
{% top %}
......@@ -170,4 +202,4 @@ same way as well as they apply to streaming programs, with minor exceptions:
## Next Steps
Continue with the basic concepts in Flink's [Distributed Runtime]({{ site.baseurl }}/concepts/runtime).
Continue with the basic concepts in Flink's [Distributed Runtime](runtime.html).
......@@ -35,7 +35,7 @@ The chaining behavior can be configured in the APIs.
The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads.
<img src="{{ site.baseurl }}/fig/tasks_chains.svg" alt="Operator chaining into Tasks" class="offset" width="80%" />
<img src="../fig/tasks_chains.svg" alt="Operator chaining into Tasks" class="offset" width="80%" />
{% top %}
......@@ -62,7 +62,7 @@ The **client** is not part of the runtime and program execution, but is used to
After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the
Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`.
<img src="{{ site.baseurl }}/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="80%" />
<img src="../fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="80%" />
{% top %}
......@@ -82,7 +82,7 @@ separate container, for example). Having multiple slots
means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and
heartbeat messages. They may also share data sets and data structures, thus reducing the per-task overhead.
<img src="{{ site.baseurl }}/fig/tasks_slots.svg" alt="A TaskManager with Task Slots and Tasks" class="offset" width="80%" />
<img src="../fig/tasks_slots.svg" alt="A TaskManager with Task Slots and Tasks" class="offset" width="80%" />
By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as
they are from the same job. The result is that one slot may hold an entire pipeline of the
......@@ -96,7 +96,7 @@ job. Allowing this *slot sharing* has two main benefits:
With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the
slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers.
<img src="{{ site.baseurl }}/fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />
<img src="../fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />
The APIs also include a *resource group* mechanism which can be used to prevent undesirable slot sharing.
......@@ -112,7 +112,7 @@ stores data in an in-memory hash map, another state backend uses [RocksDB](http:
In addition to defining the data structure that holds the state, the state backends also implement the logic to
take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint.
<img src="{{ site.baseurl }}/fig/checkpoints.svg" alt="checkpoints and snapshots" class="offset" width="60%" />
<img src="../fig/checkpoints.svg" alt="checkpoints and snapshots" class="offset" width="60%" />
{% top %}
......
---
title: "Process Function (Low-level Operations)"
nav-title: "Process Function"
nav-parent_id: streaming
nav-pos: 35
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
* This will be replaced by the TOC
{:toc}
## The ProcessFunction
The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
all (acyclic) streaming applications:
- events (stream elements)
- state (fault tolerant, consistent)
- timers (event time and processing time)
The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
be being invoked for each event received in the input stream(s).
For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
the `ProcessFunction` needs to be applied onto a `KeyedStream`:
```java
stream.keyBy("id").process(new MyProcessFunction())
```
The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
event time timestamp, and the *TimerService*. The `TimerService` can be used to register callbacks for future
event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.
## Low-level Joins
To realize low-level operations on two inputs, applications can use the `CoProcessFunction`. It relates to the `ProcessFunction`
in the same way as a `CoFlatMapFunction` relates to the `FlatMapFunction`: The function is typed to two different inputs and
gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
Implementing a low level join follows typically the pattern:
- Create a state object for one input (or both)
- Update the state upon receiving elements from its input
- Upon receiving elements from the other input, probe the state and produce the joined result
## Example
The following example maintains counts per key, and emits the key/count pair if no update happened to the key for one minute
(in event time):
- The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key.
- For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp
- The function also schedules a callback one minute into the future (in event time)
- Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
and emits the key/count if the match (no further update happened in that minute)
*Note:* This simple example could also have been implemented on top of session windows, we simple use it to illustrate
the basic pattern of how to use the `ProcessFunction`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified) {
// emit the state
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
.getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
}
}
{% endhighlight %}
</div>
</div>
{% top %}
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<svg
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:cc="http://creativecommons.org/ns#"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg"
version="1.1"
width="974.0144"
height="409.9375"
id="svg2">
<defs
id="defs4" />
<metadata
id="metadata7">
<rdf:RDF>
<cc:Work
rdf:about="">
<dc:format>image/svg+xml</dc:format>
<dc:type
rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
<dc:title></dc:title>
</cc:Work>
</rdf:RDF>
</metadata>
<g
transform="translate(258.42828,-167.38041)"
id="layer1">
<g
transform="translate(-323.70953,144.47416)"
id="g2989">
<path
d="m 66.203993,358.32677 0,73.59333 621.867427,0 0,-73.59333 -621.867427,0 z"
id="path2991"
style="fill:#e4eaf4;fill-opacity:1;fill-rule:evenodd;stroke:none" />
<path
d="m 66.203993,358.32677 621.867427,0 0,73.59333 -621.867427,0 z"
id="path2993"
style="fill:none;stroke:#898c92;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
<text
x="164.98396"
y="408.29218"
id="text2995"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Stateful</text>
<text
x="293.41599"
y="408.29218"
id="text2997"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Stream Processing</text>
<path
d="m 181.69526,246.88651 0,73.59333 506.37616,0 0,-73.59333 -506.37616,0 z"
id="path2999"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:none" />
<path
d="m 181.69526,246.88651 506.37616,0 0,73.59333 -506.37616,0 z"
id="path3001"
style="fill:none;stroke:#935f1c;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
<text
x="231.63388"
y="296.79422"
id="text3003"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">DataStream </text>
<text
x="428.33289"
y="296.79422"
id="text3005"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">/ </text>
<text
x="447.83777"
y="296.79422"
id="text3007"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">DataSet</text>
<text
x="582.12122"
y="296.79422"
id="text3009"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">API</text>
<path
d="m 288.93448,135.44624 0,73.4433 399.13694,0 0,-73.4433 -399.13694,0 z"
id="path3011"
style="fill:#be73f1;fill-opacity:1;fill-rule:evenodd;stroke:none" />
<path
d="m 288.93448,135.44624 399.13694,0 0,73.4433 -399.13694,0 z"
id="path3013"
style="fill:none;stroke:#724591;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
<text
x="414.60895"
y="185.29616"
id="text3015"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Table API</text>
<path
d="m 415.0409,23.855943 0,73.593334 273.03052,0 0,-73.593334 -273.03052,0 z"
id="path3017"
style="fill:#e6526e;fill-opacity:1;fill-rule:evenodd;stroke:none" />
<path
d="m 415.0409,23.855943 273.03052,0 0,73.593334 -273.03052,0 z"
id="path3019"
style="fill:none;stroke:#8a3142;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
<text
x="516.66846"
y="73.79821"
id="text3021"
xml:space="preserve"
style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">SQL</text>
<text
x="722.66699"
y="292.85269"
id="text3023"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Core </text>
<text
x="782.38184"
y="292.85269"
id="text3025"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">APIs</text>
<text
x="722.66699"
y="181.35474"
id="text3027"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Declarative DSL</text>
<text
x="722.66699"
y="69.856773"
id="text3029"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">High</text>
<text
x="774.27985"
y="69.856773"
id="text3031"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">-</text>
<text
x="782.68195"
y="69.856773"
id="text3033"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">level Language</text>
<text
x="722.66699"
y="389.2005"
id="text3035"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Low</text>
<text
x="768.72845"
y="389.2005"
id="text3037"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">-</text>
<text
x="777.13055"
y="389.2005"
id="text3039"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">level building block</text>
<text
x="722.66699"
y="419.20798"
id="text3041"
xml:space="preserve"
style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">(streams, state, [event] time)</text>
</g>
</g>
</svg>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册