提交 35b4da27 编写于 作者: J Jonas Traub 提交者: Aljoscha Krettek

[FLINK-4121] Add timeunit (ms) to docs for timestamps and watermarks

This closes #2165
上级 d34bdaf7
......@@ -48,14 +48,13 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
</div>
</div>
## Assigning Timestamps
In order to work with *Event Time*, Flink needs to know the events' *timestamps*, meaning each element in the
stream needs to get its event timestamp *assigned*. That happens usually by accessing/extracting the
timestamp from some field in the element.
Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
the progress in event time.
There are two ways to assign timestamps and generate Watermarks:
......@@ -63,6 +62,8 @@ There are two ways to assign timestamps and generate Watermarks:
1. Directly in the data stream source
2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted
<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
### Source Functions with Timestamps and Watermarks
......@@ -116,18 +117,18 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm
Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
In any case, the timestamp assigner needs to be specified before the first operation on event time
(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
Flink allows the specification of a timestamp assigner / watermark emitter inside
the source (or consumer) itself. More information on how to do so can be found in the
[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
Flink allows the specification of a timestamp assigner / watermark emitter inside
the source (or consumer) itself. More information on how to do so can be found in the
[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
**NOTE:** The remainder of this section presents the main interfaces a programmer has
to implement in order to create her own timestamp extractors/watermark emitters.
To see the pre-implemented extractors that ship with Flink, please refer to the
to implement in order to create her own timestamp extractors/watermark emitters.
To see the pre-implemented extractors that ship with Flink, please refer to the
[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html) page.
<div class="codetabs" markdown="1">
......@@ -137,7 +138,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
......@@ -157,7 +158,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter());
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
......@@ -176,7 +177,7 @@ withTimestampsAndWatermarks
#### **With Periodic Watermarks**
The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
on the stream elements, or purely based on processing time).
The interval (every *n* milliseconds) in which the watermark will be generated is defined via
......@@ -202,7 +203,7 @@ public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermar
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
......@@ -229,7 +230,7 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
......@@ -249,7 +250,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
......@@ -273,7 +274,7 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
......
......@@ -48,11 +48,11 @@ public final class Watermark extends StreamElement {
// ------------------------------------------------------------------------
/** The timestamp of the watermark */
/** The timestamp of the watermark in milliseconds*/
private final long timestamp;
/**
* Creates a new watermark with the given timestamp.
* Creates a new watermark with the given timestamp in milliseconds.
*/
public Watermark(long timestamp) {
this.timestamp = timestamp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册