提交 8a8d95e3 编写于 作者: D David Anderson 提交者: Greg Hogan

[FLINK-6512] [docs] improved code formatting in some examples

This closes #3857
上级 91f37658
......@@ -59,8 +59,8 @@ ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
{% highlight java %}
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
{% endhighlight %}
......@@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a `Configuration` object t
{% highlight java %}
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
{% endhighlight %}
In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
{% highlight java %}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do
{% endhighlight %}
......@@ -147,11 +148,12 @@ Access them in any rich user function:
{% highlight java %}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
{% endhighlight %}
......@@ -198,8 +200,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
{% endhighlight %}
......
......@@ -51,69 +51,70 @@ As running examples for the remainder of this document we will use the `CountMap
functions. The first is an example of a function with **keyed** state, while
the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
{% highlight java %}
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> counter;
private transient ValueState<Integer> counter;
private final int numberElements;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private final int threshold;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
}
{% endhighlight %}
The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form
......@@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
The `ListCheckpointed` interface requires the implementation of two methods:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
{% highlight java %}
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
void restoreState(List<T> state) throws Exception;
{% endhighlight %}
Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
......@@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of objects to checkpoint, as
return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
is included below:
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
{% highlight java %}
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
{% endhighlight %}
As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
compatibility reasons and more details will be explained at the end of this section.
......@@ -224,9 +229,11 @@ compatibility reasons and more details will be explained at the end of this sect
The `CheckpointedFunction` interface requires again the implementation of two methods:
void snapshotState(FunctionSnapshotContext context) throws Exception;
{% highlight java %}
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
{% endhighlight %}
As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
......@@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given this, `initializeState(
types of state are initialized, but also where state recovery logic is included. An implementation of the
`CheckpointedFunction` interface for `BufferingSink` is presented below.
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
{% highlight java %}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
{% endhighlight %}
The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
......@@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `Co
the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
would look like this:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
{% highlight java %}
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
private transient ValueState<Integer> counter;
private transient ValueState<Integer> counter;
private final int numberElements;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//all managed, nothing to do.
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
}
{% endhighlight %}
Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
upon checkpointing.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册