提交 e7996b0d 编写于 作者: A Aljoscha Krettek

[FLINK-7552] Extend SinkFunction interface with SinkContext

上级 9b0ba7ba
......@@ -36,6 +36,7 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import com.datastax.driver.core.Cluster;
......@@ -459,7 +460,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
sink.open(new Configuration());
for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
sink.invoke(value);
sink.invoke(value, SinkContextUtil.forTimestamp(0));
}
sink.close();
......
......@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
......@@ -117,7 +118,7 @@ public class FlinkKafkaProducerBaseTest {
producer.open(new Configuration());
verify(mockPartitioner, times(1)).open(0, 1);
producer.invoke("foobar");
producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
verify(mockPartitioner, times(1)).partition(
"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.rabbitmq.common;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
......@@ -91,7 +92,7 @@ public class RMQSinkTest {
public void invokePublishBytesToQueue() throws Exception {
RMQSink<String> rmqSink = createRMQSink();
rmqSink.invoke(MESSAGE_STR);
rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
verify(serializationSchema).serialize(MESSAGE_STR);
verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
}
......@@ -101,7 +102,7 @@ public class RMQSinkTest {
RMQSink<String> rmqSink = createRMQSink();
doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
rmqSink.invoke("msg");
rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
}
@Test
......@@ -110,7 +111,7 @@ public class RMQSinkTest {
rmqSink.setLogFailuresOnly(true);
doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
rmqSink.invoke("msg");
rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
}
@Test
......
......@@ -27,7 +27,4 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN value) throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.Internal;
/**
* Utility for creating Sink {@link SinkFunction.Context Contexts}.
*/
@Internal
public class SinkContextUtil {
/**
* Creates a {@link SinkFunction.Context} that
* throws an exception when trying to access the current watermark or processing time.
*/
public static <T> SinkFunction.Context<T> forTimestamp(long timestamp) {
return new SinkFunction.Context<T>() {
@Override
public long currentProcessingTime() {
throw new RuntimeException("Not implemented");
}
@Override
public long currentWatermark() {
throw new RuntimeException("Not implemented");
}
@Override
public long timestamp() {
return timestamp;
}
@Override
public boolean hasTimestamp() {
return true;
}
};
}
}
......@@ -35,6 +35,52 @@ public interface SinkFunction<IN> extends Function, Serializable {
*
* @param value The input record.
* @throws Exception
* @deprecated Use {@link #invoke(Object, Context)}.
*/
void invoke(IN value) throws Exception;
@Deprecated
default void invoke(IN value) throws Exception {
}
/**
* Writes the given value to the sink. This function is called for every record.
*
* @param value The input record.
* @param context Additional context about the input record.
* @throws Exception
*/
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
/**
* Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
* an input record.
*
* <p>The context is only valid for the duration of a
* {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
* afterwards!
*
* @param <T> The type of elements accepted by the sink.
*/
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record.
*/
long timestamp();
/**
* Checks whether this record has a timestamp.
*
* @return True if the record has a timestamp, false if not.
*/
boolean hasTimestamp();
}
}
......@@ -19,8 +19,10 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
/**
* A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}.
......@@ -31,14 +33,27 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
private static final long serialVersionUID = 1L;
private transient SimpleContext sinkContext;
/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
private long currentWatermark = Long.MIN_VALUE;
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
this.sinkContext = new SimpleContext<>(getProcessingTimeService());
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
userFunction.invoke(element.getValue());
sinkContext.element = element;
userFunction.invoke(element.getValue(), sinkContext);
}
@Override
......@@ -48,4 +63,47 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
// sinks don't forward latency markers
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
}
private class SimpleContext<IN> implements SinkFunction.Context<IN> {
private StreamRecord<IN> element;
private final ProcessingTimeService processingTimeService;
public SimpleContext(ProcessingTimeService processingTimeService) {
this.processingTimeService = processingTimeService;
}
@Override
public long currentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
@Override
public long currentWatermark() {
return currentWatermark;
}
@Override
public long timestamp() {
if (!element.hasTimestamp()) {
throw new IllegalStateException(
"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
}
return element.getTimestamp();
}
public boolean hasTimestamp() {
return element.hasTimestamp();
}
}
}
......@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.junit.After;
......@@ -40,7 +41,7 @@ public class PrintSinkFunctionTest {
private String line = System.lineSeparator();
@Test
public void testPrintSinkStdOut(){
public void testPrintSinkStdOut() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream stream = new PrintStream(baos);
System.setOut(stream);
......@@ -55,7 +56,7 @@ public class PrintSinkFunctionTest {
Assert.fail();
}
printSink.setTargetToStandardOut();
printSink.invoke("hello world!");
printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0));
assertEquals("Print to System.out", printSink.toString());
assertEquals("hello world!" + line, baos.toString());
......@@ -65,7 +66,7 @@ public class PrintSinkFunctionTest {
}
@Test
public void testPrintSinkStdErr(){
public void testPrintSinkStdErr() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream stream = new PrintStream(baos);
System.setOut(stream);
......@@ -80,7 +81,7 @@ public class PrintSinkFunctionTest {
Assert.fail();
}
printSink.setTargetToStandardErr();
printSink.invoke("hello world!");
printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0));
assertEquals("Print to System.err", printSink.toString());
assertEquals("hello world!" + line, baos.toString());
......@@ -90,7 +91,7 @@ public class PrintSinkFunctionTest {
}
@Test
public void testPrintSinkWithPrefix(){
public void testPrintSinkWithPrefix() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream stream = new PrintStream(baos);
System.setOut(stream);
......@@ -107,7 +108,7 @@ public class PrintSinkFunctionTest {
Assert.fail();
}
printSink.setTargetToStandardErr();
printSink.invoke("hello world!");
printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0));
assertEquals("Print to System.err", printSink.toString());
assertEquals("2> hello world!" + line, baos.toString());
......
......@@ -74,7 +74,7 @@ public class SocketClientSinkTest extends TestLogger {
try {
SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0);
simpleSink.open(new Configuration());
simpleSink.invoke(TEST_MESSAGE + '\n');
simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0));
simpleSink.close();
}
catch (Throwable t) {
......@@ -117,7 +117,7 @@ public class SocketClientSinkTest extends TestLogger {
public void run() {
try {
// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
simpleSink.invoke(TEST_MESSAGE + '\n');
simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0));
}
catch (Throwable t) {
error.set(t);
......@@ -182,7 +182,7 @@ public class SocketClientSinkTest extends TestLogger {
// socket should be closed, so this should trigger a re-try
// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
while (true) { // we have to do this more often as the server side closed is not guaranteed to be noticed immediately
simpleSink.invoke(TEST_MESSAGE + '\n');
simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0));
}
}
catch (IOException e) {
......@@ -238,7 +238,7 @@ public class SocketClientSinkTest extends TestLogger {
// Initial payload => this will be received by the server an then the socket will be
// closed.
sink.invoke("0\n");
sink.invoke("0\n", SinkContextUtil.forTimestamp(0));
// Get future an make sure there was no problem. This will rethrow any Exceptions from
// the server.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link StreamSink}.
*/
public class StreamSinkOperatorTest extends TestLogger {
@Rule
public ExpectedException expectedException = ExpectedException.none();
/**
* Verify that we can correctly query watermark, processing time and the timestamp from the
* context.
*/
@Test
public void testTimeQuerying() throws Exception {
BufferingQueryingSink<String> bufferingSink = new BufferingQueryingSink<>();
StreamSink<String> operator = new StreamSink<>(bufferingSink);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.setup();
testHarness.open();
testHarness.processWatermark(new Watermark(17));
testHarness.setProcessingTime(12);
testHarness.processElement(new StreamRecord<>("Hello", 12L));
testHarness.processWatermark(new Watermark(42));
testHarness.setProcessingTime(15);
testHarness.processElement(new StreamRecord<>("Ciao", 13L));
testHarness.processWatermark(new Watermark(42));
testHarness.setProcessingTime(15);
testHarness.processElement(new StreamRecord<>("Ciao"));
assertThat(bufferingSink.data.size(), is(3));
assertThat(bufferingSink.data,
contains(
new Tuple4<>(17L, 12L, 12L, "Hello"),
new Tuple4<>(42L, 15L, 13L, "Ciao"),
new Tuple4<>(42L, 15L, null, "Ciao")));
testHarness.close();
}
private static class BufferingQueryingSink<T> implements SinkFunction<T> {
// watermark, processing-time, timestamp, event
private final List<Tuple4<Long, Long, Long, T>> data;
public BufferingQueryingSink() {
data = new ArrayList<>();
}
@Override
public void invoke(
T value, Context context) throws Exception {
if (context.hasTimestamp()) {
data.add(
new Tuple4<>(
context.currentWatermark(),
context.currentProcessingTime(),
context.timestamp(),
value));
} else {
data.add(
new Tuple4<>(
context.currentWatermark(),
context.currentProcessingTime(),
null,
value));
}
}
}
}
......@@ -1457,6 +1457,8 @@ under the License.
<excludes>
<exclude>@org.apache.flink.annotation.PublicEvolving</exclude>
<exclude>@org.apache.flink.annotation.Internal</exclude>
<exclude>org.apache.flink.streaming.api.functions.sink.RichSinkFunction#invoke(java.lang.Object)</exclude>
<exclude>org.apache.flink.streaming.api.functions.sink.SinkFunction</exclude>
</excludes>
<accessModifier>public</accessModifier>
<breakBuildOnModifications>false</breakBuildOnModifications>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册