提交 2714aaf3 编写于 作者: R Robert Metzger

[FLINK-3296] Remove 'flushing' behavior of the OutputFormat support of the DataStream API

This closes #1563
上级 8486c3c2
......@@ -1819,14 +1819,14 @@ of each element on the standard out / strandard error stream. Optionally, a pref
prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is
greater than 1, the output will also be prepended with the identifier of the task which produced the output.
- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
custom object-to-bytes conversion.
- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema`
- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as
Apache Kafka) that are implemented as sink functions.
</div>
<div data-lang="scala" markdown="1">
......@@ -1847,7 +1847,7 @@ of each element on the standard out / strandard error stream. Optionally, a pref
prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is
greater than 1, the output will also be prepended with the identifier of the task which produced the output.
- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
custom object-to-bytes conversion.
- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema`
......@@ -1858,6 +1858,17 @@ greater than 1, the output will also be prepended with the identifier of the tas
</div>
</div>
Note that the `write*()` methods on `DataStream` are mainly intended for debugging purposes.
They are not participating in Flink's checkpointing, this means these functions usually have
at-least-once semantics. The data flushing to the target system depends on the implementation of the
OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up
in the target system. Also, in failure cases, those records might be lost.
For reliable, exactly-once delivery of a stream into a file system, use the `flink-connector-filesystem`.
Also, custom implementations through the `.addSink(...)` method can partiticpate in Flink's checkpointing
for exactly-once semantics.
{% top %}
......
......@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
public class HBaseWriteStreamExample {
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
......@@ -64,14 +64,9 @@ public class HBaseWriteStreamExample {
isRunning = false;
}
});
dataStream.write(new HBaseOutputFormat(), 0L);
dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
try {
env.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
env.execute();
}
/**
......
......@@ -90,7 +90,7 @@ public class IterateExample {
// emit results
if (fileOutput) {
numbers.writeAsText(outputPath, 1);
numbers.writeAsText(outputPath);
} else {
numbers.print();
}
......
......@@ -87,7 +87,7 @@ public class WindowJoin {
// emit result
if (fileOutput) {
joinedStream.writeAsText(outputPath, 1);
joinedStream.writeAsText(outputPath);
} else {
joinedStream.print();
}
......
......@@ -50,8 +50,6 @@ import java.util.concurrent.TimeUnit;
*/
public class IncrementalLearningSkeleton {
private static DataStream<Integer> trainingData = null;
private static DataStream<Integer> newData = null;
// *************************************************************************
// PROGRAM
......@@ -66,8 +64,8 @@ public class IncrementalLearningSkeleton {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
trainingData = env.addSource(new FiniteTrainingDataSource());
newData = env.addSource(new FiniteNewDataSource());
DataStream<Integer> trainingData = env.addSource(new FiniteTrainingDataSource());
DataStream<Integer> newData = env.addSource(new FiniteNewDataSource());
// build new model on every second of new data
DataStream<Double[]> model = trainingData
......@@ -80,7 +78,7 @@ public class IncrementalLearningSkeleton {
// emit result
if (fileOutput) {
prediction.writeAsText(outputPath, 1);
prediction.writeAsText(outputPath);
} else {
prediction.print();
}
......
......@@ -67,7 +67,7 @@ public class SocketTextStreamWordCount {
.sum(1);
if (fileOutput) {
counts.writeAsText(outputPath, 1);
counts.writeAsText(outputPath);
} else {
counts.print();
}
......
......@@ -61,7 +61,7 @@ object SocketTextStreamWordCount {
.sum(1)
if (fileOutput) {
counts.writeAsText(outputPath, 1)
counts.writeAsText(outputPath)
} else {
counts print
}
......
......@@ -78,7 +78,7 @@ public class WordCount {
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, 1);
counts.writeAsCsv(outputPath);
} else {
counts.print();
}
......
......@@ -114,7 +114,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
ExecutionConfig executionConfig;
try {
ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
ExecutionConfig c = InstantiationUtil.readObjectFromConfig(
getJobConfiguration(),
ExecutionConfig.CONFIG_KEY,
getUserCodeClassLoader());
......@@ -130,7 +130,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
try {
// initialize local strategies
MutableObjectIterator<IT> input1;
switch (this.config.getInputLocalStrategy(0)) {
......
......@@ -49,7 +49,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
......@@ -193,7 +193,7 @@ public class DataStream<T> {
unionedTransforms.add(newStream.getTransformation());
}
return new DataStream<T>(this.environment, new UnionTransformation<T>(unionedTransforms));
return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
}
/**
......@@ -208,7 +208,7 @@ public class DataStream<T> {
* @return The {@link SplitStream}
*/
public SplitStream<T> split(OutputSelector<T> outputSelector) {
return new SplitStream<T>(this, clean(outputSelector));
return new SplitStream<>(this, clean(outputSelector));
}
/**
......@@ -222,7 +222,7 @@ public class DataStream<T> {
* @return The {@link ConnectedStreams}.
*/
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
return new ConnectedStreams<T, R>(environment, this, dataStream);
return new ConnectedStreams<>(environment, this, dataStream);
}
/**
......@@ -235,7 +235,7 @@ public class DataStream<T> {
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
return new KeyedStream<T, K>(this, clean(key));
return new KeyedStream<>(this, clean(key));
}
/**
......@@ -250,7 +250,7 @@ public class DataStream<T> {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
} else {
return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
}
}
......@@ -266,11 +266,11 @@ public class DataStream<T> {
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
**/
public KeyedStream<T, Tuple> keyBy(String... fields) {
return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
}
private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
......@@ -288,7 +288,7 @@ public class DataStream<T> {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType()));
} else {
return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
return partitionByHash(new Keys.ExpressionKeys<>(fields, getType()));
}
}
......@@ -303,7 +303,7 @@ public class DataStream<T> {
*
*/
public DataStream<T> partitionByHash(String... fields) {
return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
return partitionByHash(new Keys.ExpressionKeys<>(fields, getType()));
}
/**
......@@ -316,7 +316,7 @@ public class DataStream<T> {
* @return The partitioned DataStream
*/
public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector) {
return setConnectionType(new HashPartitioner<T>(clean(keySelector)));
return setConnectionType(new HashPartitioner<>(clean(keySelector)));
}
//private helper method for partitioning
......@@ -326,7 +326,7 @@ public class DataStream<T> {
getType(),
getExecutionConfig()));
return setConnectionType(new HashPartitioner<T>(keySelector));
return setConnectionType(new HashPartitioner<>(keySelector));
}
/**
......@@ -340,7 +340,7 @@ public class DataStream<T> {
* @return The partitioned DataStream.
*/
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) {
Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<T>(new int[]{field}, getType());
Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<>(new int[]{field}, getType());
return partitionCustom(partitioner, outExpressionKeys);
}
......@@ -355,7 +355,7 @@ public class DataStream<T> {
* @return The partitioned DataStream.
*/
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {
Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<T>(new String[]{field}, getType());
Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<>(new String[]{field}, getType());
return partitionCustom(partitioner, outExpressionKeys);
}
......@@ -376,7 +376,7 @@ public class DataStream<T> {
* @see KeySelector
*/
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner),
return setConnectionType(new CustomPartitionerWrapper<>(clean(partitioner),
clean(keySelector)));
}
......@@ -385,7 +385,7 @@ public class DataStream<T> {
KeySelector<T, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig());
return setConnectionType(
new CustomPartitionerWrapper<K, T>(
new CustomPartitionerWrapper<>(
clean(partitioner),
clean(keySelector)));
}
......@@ -499,7 +499,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public IterativeStream<T> iterate() {
return new IterativeStream<T>(this, 0);
return new IterativeStream<>(this, 0);
}
/**
......@@ -535,7 +535,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public IterativeStream<T> iterate(long maxWaitTimeMillis) {
return new IterativeStream<T>(this, maxWaitTimeMillis);
return new IterativeStream<>(this, maxWaitTimeMillis);
}
/**
......@@ -557,7 +557,7 @@ public class DataStream<T> {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return transform("Map", outType, new StreamMap<T, R>(clean(mapper)));
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
/**
......@@ -581,7 +581,7 @@ public class DataStream<T> {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<T, R>(clean(flatMapper)));
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
......@@ -600,7 +600,7 @@ public class DataStream<T> {
* @return The filtered DataStream.
*/
public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter) {
return transform("Filter", getType(), new StreamFilter<T>(clean(filter)));
return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
}
......@@ -623,7 +623,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes) {
return new StreamProjection<T>(this, fieldIndexes).projectTupleX();
return new StreamProjection<>(this, fieldIndexes).projectTupleX();
}
/**
......@@ -774,7 +774,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction);
}
......@@ -789,7 +789,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> printToErr() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>(true);
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true);
return addSink(printFunction);
}
......@@ -807,29 +807,9 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> writeAsText(String path) {
return write(new TextOutputFormat<T>(new Path(path)), 0L);
return writeUsingOutputFormat(new TextOutputFormat<T>(new Path(path)));
}
/**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds.
*
* <p>
* For every element of the DataStream the result of {@link Object#toString()}
* is written.
*
* @param path
* The path pointing to the location the text file is written to.
* @param millis
* The file update frequency.
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> writeAsText(String path, long millis) {
TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
return write(tof, millis);
}
/**
* Writes a DataStream to the file specified by path in text format.
......@@ -848,34 +828,11 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
tof.setWriteMode(writeMode);
return write(tof, 0L);
return writeUsingOutputFormat(tof);
}
/**
* Writes a DataStream to the file specified by path in text format.
*
* <p>
* For every element of the DataStream the result of {@link Object#toString()}
* is written.
*
* @param path
* The path pointing to the location the text file is written to
* @param writeMode
* Controls the behavior for existing files. Options are
* NO_OVERWRITE and OVERWRITE.
* @param millis
T the file update frequency
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis) {
TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
tof.setWriteMode(writeMode);
return write(tof, millis);
}
/**
* Writes a DataStream to the file specified by the path parameter.
......@@ -891,28 +848,9 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> writeAsCsv(String path) {
return writeAsCsv(path, null, 0L, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
return writeAsCsv(path, null, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
}
/**
* Writes a DataStream to the file specified by the path parameter. The
* writing is performed periodically, in every millis milliseconds.
*
* <p>
* For every field of an element of the DataStream the result of {@link Object#toString()}
* is written. This method can only be used on data streams of tuples.
*
* @param path
* the path pointing to the location the text file is written to
* @param millis
* the file update frequency
*
* @return the closed DataStream
*/
@PublicEvolving
public DataStreamSink<T> writeAsCsv(String path, long millis) {
return writeAsCsv(path, null, millis, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
}
/**
* Writes a DataStream to the file specified by the path parameter.
......@@ -931,7 +869,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode) {
return writeAsCsv(path, writeMode, 0L, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
return writeAsCsv(path, writeMode, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
}
/**
......@@ -947,31 +885,6 @@ public class DataStream<T> {
* @param writeMode
* Controls the behavior for existing files. Options are
* NO_OVERWRITE and OVERWRITE.
* @param millis
* the file update frequency
*
* @return the closed DataStream
*/
@PublicEvolving
public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, long millis) {
return writeAsCsv(path, writeMode, millis, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
}
/**
* Writes a DataStream to the file specified by the path parameter. The
* writing is performed periodically every millis milliseconds.
*
* <p>
* For every field of an element of the DataStream the result of {@link Object#toString()}
* is written. This method can only be used on data streams of tuples.
*
* @param path
* the path pointing to the location the text file is written to
* @param writeMode
* Controls the behavior for existing files. Options are
* NO_OVERWRITE and OVERWRITE.
* @param millis
* the file update frequency
* @param rowDelimiter
* the delimiter for two rows
* @param fieldDelimiter
......@@ -984,14 +897,13 @@ public class DataStream<T> {
public <X extends Tuple> DataStreamSink<T> writeAsCsv(
String path,
WriteMode writeMode,
long millis,
String rowDelimiter,
String fieldDelimiter) {
Preconditions.checkArgument(
getType().isTupleType(),
"The writeAsCsv() method can only be used on data streams of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(
CsvOutputFormat<X> of = new CsvOutputFormat<>(
new Path(path),
rowDelimiter,
fieldDelimiter);
......@@ -1000,7 +912,7 @@ public class DataStream<T> {
of.setWriteMode(writeMode);
}
return write((OutputFormat<T>) of, millis);
return writeUsingOutputFormat((OutputFormat<T>) of);
}
/**
......@@ -1017,7 +929,7 @@ public class DataStream<T> {
*/
@PublicEvolving
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
}
......@@ -1025,13 +937,16 @@ public class DataStream<T> {
/**
* Writes the dataStream into an output, described by an OutputFormat.
*
* The output is not participating in Flink's checkpointing!
*
* For writing to a file system periodically, the use of the "flink-connector-filesystem" is recommended.
*
* @param format The output format
* @param millis the write frequency
* @return The closed DataStream
*/
@PublicEvolving
public DataStreamSink<T> write(OutputFormat<T> format, long millis) {
return addSink(new FileSinkFunctionByMillis<T>(format, millis));
public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
return addSink(new OutputFormatSinkFunction<>(format));
}
/**
......@@ -1054,7 +969,7 @@ public class DataStream<T> {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>(
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
......@@ -1077,7 +992,7 @@ public class DataStream<T> {
* @return The modified DataStream.
*/
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
return new DataStream<T>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner));
}
/**
......@@ -1099,9 +1014,9 @@ public class DataStream<T> {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() );
}
StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.OutputFormat;
/**
* Implementation of FileSinkFunction. Writes tuples to file in every millis
* milliseconds.
*
* @param <IN>
* Input type
*/
@PublicEvolving
public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final long millis;
private long lastTime;
public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
super(format);
this.millis = millis;
lastTime = System.currentTimeMillis();
}
/**
* Condition for writing the contents of tupleList and clearing it.
*
* @return value of the updating condition
*/
@Override
protected boolean updateCondition() {
return System.currentTimeMillis() - lastTime >= millis;
}
/**
* Statements to be executed after writing a batch goes here.
*/
@Override
protected void resetParameters() {
tupleList.clear();
lastTime = System.currentTimeMillis();
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.functions.sink;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
......@@ -34,28 +33,21 @@ import org.slf4j.LoggerFactory;
/**
* Simple implementation of the SinkFunction writing tuples in the specified
* OutputFormat format. Tuples are collected to a list and written to the file
* periodically. The target path and the overwrite mode are pre-packaged in
* format.
* OutputFormat format.
*
* @param <IN>
* Input type
* @param <IN> Input type
*/
@PublicEvolving
public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> implements
InputTypeConfigurable {
public class OutputFormatSinkFunction<IN> extends RichSinkFunction<IN> implements InputTypeConfigurable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
private static final Logger LOG = LoggerFactory.getLogger(OutputFormatSinkFunction.class);
protected ArrayList<IN> tupleList = new ArrayList<IN>();
protected volatile OutputFormat<IN> format;
protected volatile boolean cleanupCalled = false;
protected int indexInSubtaskGroup;
protected int currentNumberOfSubtasks;
private OutputFormat<IN> format;
private boolean cleanupCalled = false;
public FileSinkFunction(OutputFormat<IN> format) {
public OutputFormatSinkFunction(OutputFormat<IN> format) {
this.format = format;
}
......@@ -63,8 +55,8 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> implemen
public void open(Configuration parameters) throws Exception {
RuntimeContext context = getRuntimeContext();
format.configure(parameters);
indexInSubtaskGroup = context.getIndexOfThisSubtask();
currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
int indexInSubtaskGroup = context.getIndexOfThisSubtask();
int currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
}
......@@ -78,66 +70,33 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> implemen
@Override
public void invoke(IN record) throws Exception {
tupleList.add(record);
if (updateCondition()) {
flush();
try {
format.writeRecord(record);
} catch (Exception ex) {
cleanup();
throw ex;
}
}
@Override
public void close() throws IOException {
if (!tupleList.isEmpty()) {
flush();
}
try {
format.close();
} catch (Exception ex) {
if (LOG.isErrorEnabled()) {
LOG.error("Error while writing element.", ex);
}
try {
if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
cleanupCalled = true;
((CleanupWhenUnsuccessful) format).tryCleanupOnError();
}
} catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}
cleanup();
throw ex;
}
}
protected void flush() {
private void cleanup() {
try {
for (IN rec : tupleList) {
format.writeRecord(rec);
if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
cleanupCalled = true;
((CleanupWhenUnsuccessful) format).tryCleanupOnError();
}
} catch (Exception ex) {
try {
if (LOG.isErrorEnabled()) {
LOG.error("Error while writing element.", ex);
}
if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
cleanupCalled = true;
((CleanupWhenUnsuccessful) format).tryCleanupOnError();
}
} catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}
throw new RuntimeException(ex);
} catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}
resetParameters();
}
/**
* Condition for writing the contents of tupleList and clearing it.
*
* @return value of the updating condition
*/
protected abstract boolean updateCondition();
/**
* Statements to be executed after writing a batch goes here.
*/
protected abstract void resetParameters();
}
......@@ -738,21 +738,9 @@ class DataStream[T](stream: JavaStream[T]) {
*/
@PublicEvolving
def writeAsText(path: String): DataStreamSink[T] =
stream.writeAsText(path, 0L)
stream.writeAsText(path)
/**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, every millis milliseconds. For
* every element of the DataStream the result of .toString
* is written.
*
* @param path The path pointing to the location the text file is written to
* @param millis The file update frequency
* @return The closed DataStream
*/
@PublicEvolving
def writeAsText(path: String, millis: Long): DataStreamSink[T] =
stream.writeAsText(path, millis)
/**
* Writes a DataStream to the file specified by path in text format. For
......@@ -772,30 +760,6 @@ class DataStream[T](stream: JavaStream[T]) {
}
}
/**
* Writes a DataStream to the file specified by path in text format. The writing is performed
* periodically every millis milliseconds. For every element of the DataStream the result of
* .toString is written.
*
* @param path The path pointing to the location the text file is written to
* @param writeMode Controls the behavior for existing files. Options are NO_OVERWRITE and
* OVERWRITE.
* @param millis The file update frequency
* @return The closed DataStream
*/
@PublicEvolving
def writeAsText(
path: String,
writeMode: FileSystem.WriteMode,
millis: Long)
: DataStreamSink[T] = {
if (writeMode != null) {
stream.writeAsText(path, writeMode, millis)
} else {
stream.writeAsText(path, millis)
}
}
/**
* Writes the DataStream in CSV format to the file specified by the path parameter. The writing
* is performed periodically every millis milliseconds.
......@@ -808,25 +772,6 @@ class DataStream[T](stream: JavaStream[T]) {
writeAsCsv(
path,
null,
0L,
ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
}
/**
* Writes the DataStream in CSV format to the file specified by the path parameter. The writing
* is performed periodically every millis milliseconds.
*
* @param path Path to the location of the CSV file
* @param millis File update frequency
* @return The closed DataStream
*/
@PublicEvolving
def writeAsCsv(path: String, millis: Long): DataStreamSink[T] = {
writeAsCsv(
path,
null,
millis,
ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
}
......@@ -844,26 +789,6 @@ class DataStream[T](stream: JavaStream[T]) {
writeAsCsv(
path,
writeMode,
0L,
ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
}
/**
* Writes the DataStream in CSV format to the file specified by the path parameter. The writing
* is performed periodically every millis milliseconds.
*
* @param path Path to the location of the CSV file
* @param writeMode Controls whether an existing file is overwritten or not
* @param millis File update frequency
* @return The closed DataStream
*/
@PublicEvolving
def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, millis: Long): DataStreamSink[T] = {
writeAsCsv(
path,
writeMode,
millis,
ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
}
......@@ -874,7 +799,6 @@ class DataStream[T](stream: JavaStream[T]) {
*
* @param path Path to the location of the CSV file
* @param writeMode Controls whether an existing file is overwritten or not
* @param millis File update frequency
* @param rowDelimiter Delimiter for consecutive rows
* @param fieldDelimiter Delimiter for consecutive fields
* @return The closed DataStream
......@@ -883,7 +807,6 @@ class DataStream[T](stream: JavaStream[T]) {
def writeAsCsv(
path: String,
writeMode: FileSystem.WriteMode,
millis: Long,
rowDelimiter: String,
fieldDelimiter: String)
: DataStreamSink[T] = {
......@@ -892,16 +815,15 @@ class DataStream[T](stream: JavaStream[T]) {
if (writeMode != null) {
of.setWriteMode(writeMode)
}
stream.write(of.asInstanceOf[OutputFormat[T]], millis)
stream.writeUsingOutputFormat(of.asInstanceOf[OutputFormat[T]])
}
/**
* Writes a DataStream using the given [[OutputFormat]]. The
* writing is performed periodically, in every millis milliseconds.
* Writes a DataStream using the given [[OutputFormat]].
*/
@PublicEvolving
def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
stream.write(format, millis)
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] = {
stream.writeUsingOutputFormat(format)
}
/**
......
......@@ -58,7 +58,7 @@ public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase {
@Test
public void testPathMillis() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 1);
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
}
@Test
......@@ -68,12 +68,12 @@ public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase {
@Test
public void testPathWriteModeMillis() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE);
}
@Test
public void testPathWriteModeMillisDelimiter() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ",");
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, "\n", ",");
}
@Test
......@@ -91,7 +91,7 @@ public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase {
public void failPathWriteModeMillis() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
try {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE);
fail("File should exist");
} catch (Exception e) {
assertTrue(e.getCause().getMessage().contains("File already exists"));
......@@ -102,7 +102,7 @@ public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase {
public void failPathWriteModeMillisDelimiter() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
try {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ",");
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, "\n", ",");
fail("File should exist.");
} catch (Exception e) {
assertTrue(e.getCause().getMessage().contains("File already exists"));
......
......@@ -53,20 +53,12 @@ public class TextOutputFormatITCase extends StreamingMultipleProgramsTestBase {
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath);
}
@Test
public void testPathMillis() throws Exception {
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, 1);
}
@Test
public void testPathWriteMode() throws Exception {
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE);
}
@Test
public void testPathWriteModeMillis() throws Exception {
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
}
@Test
public void failPathWriteMode() throws Exception {
......@@ -79,17 +71,6 @@ public class TextOutputFormatITCase extends StreamingMultipleProgramsTestBase {
}
}
@Test
public void failPathWriteModeMillis() throws Exception {
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath);
try {
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
fail("File should exist.");
} catch (Exception e) {
assertTrue(e.getCause().getMessage().contains("File already exists"));
}
}
@After
public void closeFile() throws Exception {
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
......
......@@ -46,15 +46,6 @@ object OutputFormatTestPrograms {
env.execute("Scala WordCountToText")
}
def wordCountToText(input : String, outputPath : String, millis : Long) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(input)
val counts = wordCountProgram(text)
counts.writeAsText(outputPath, millis)
env.execute("Scala WordCountToText")
}
def wordCountToText(
input : String,
......@@ -70,20 +61,6 @@ object OutputFormatTestPrograms {
env.execute("Scala WordCountToText")
}
def wordCountToText(
input : String,
outputPath : String,
writeMode : FileSystem.WriteMode,
millis : Long) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(input)
val counts = wordCountProgram(text)
counts.writeAsText(outputPath, writeMode, millis)
env.execute("Scala WordCountToText")
}
def wordCountToCsv(input : String, outputPath : String) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
......@@ -96,16 +73,6 @@ object OutputFormatTestPrograms {
env.execute("Scala WordCountToCsv")
}
def wordCountToCsv(input : String, outputPath : String, millis : Long) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(input)
val counts = wordCountProgram(text)
counts.writeAsCsv(outputPath, millis)
env.execute("Scala WordCountToCsv")
}
def wordCountToCsv(
input : String,
......@@ -121,26 +88,11 @@ object OutputFormatTestPrograms {
env.execute("Scala WordCountToCsv")
}
def wordCountToCsv(
input : String,
outputPath : String,
writeMode : FileSystem.WriteMode,
millis : Long) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(input)
val counts = wordCountProgram(text)
counts.writeAsCsv(outputPath, writeMode, millis)
env.execute("Scala WordCountToCsv")
}
def wordCountToCsv(
input : String,
outputPath : String,
writeMode : FileSystem.WriteMode,
millis : Long,
rowDelimiter: String,
fieldDelimiter: String) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
......@@ -148,7 +100,7 @@ object OutputFormatTestPrograms {
val counts = wordCountProgram(text)
counts.writeAsCsv(outputPath, writeMode, millis, rowDelimiter, fieldDelimiter)
counts.writeAsCsv(outputPath, writeMode, rowDelimiter, fieldDelimiter)
env.execute("Scala WordCountToCsv")
}
......
......@@ -173,7 +173,7 @@ public class AccumulatorLiveITCase {
DataStream<String> input = env.fromCollection(inputData);
input
.flatMap(new NotifyingMapper())
.write(new NotifyingOutputFormat(), 1000).disableChaining();
.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
jobGraph = env.getStreamGraph().getJobGraph();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册