From d182d0426451d43be17cd41a1c8db49b9bfc4b7d Mon Sep 17 00:00:00 2001 From: mbalassi Date: Wed, 25 Feb 2015 12:29:17 +0100 Subject: [PATCH] [FLINK-1582][streaming] SocketStream minor enhancements This closes #424 --- .../StreamExecutionEnvironment.java | 17 ++++++++++----- .../source/SocketTextStreamFunction.java | 21 ++++++++++--------- .../flink/streaming/api/SourceTest.java | 2 +- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 8487fa5715b..b43ae374afb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -374,7 +374,12 @@ public abstract class StreamExecutionEnvironment { /** * Creates a new DataStream that contains the strings received infinitely * from socket. Received strings are decoded by the system's default - * character set. + * character set. On the termination of the socket server connection retries + * can be initiated. + * + *

Let us note that the socket itself does not report on abort and + * as a consequence retries are only initiated when the socket was gracefully + * terminated.

* * @param hostname * The host name which a server socket bind. @@ -384,12 +389,14 @@ public abstract class StreamExecutionEnvironment { * @param delimiter * A character which split received strings into records. * @param maxRetry - * The maximal retry number when the socket is down. Reconnection is - * tried in every 5 seconds. A number of 0 means that the reader - * is immediately terminated. + * The maximal retry interval in seconds while the program waits for + * a socket that is temporarily down. Reconnection is initiated every + * second. A number of 0 means that the reader is immediately + * terminated, while a negative value ensures retrying forever. * @return A DataStream, containing the strings received from socket. + * */ - public DataStreamSource socketTextStream(String hostname, int port, char delimiter, int maxRetry) { + public DataStreamSource socketTextStream(String hostname, int port, char delimiter, long maxRetry) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null, "Socket Stream"); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java index d9955222a3a..3253c012952 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java @@ -37,19 +37,18 @@ public class SocketTextStreamFunction extends RichSourceFunction { private String hostname; private int port; private char delimiter; - private int maxRetry; + private long maxRetry; + private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; + private static final int CONNECTION_RETRY_SLEEP = 1000; - public SocketTextStreamFunction(String hostname, int port, char delimiter) { - this(hostname, port, delimiter, 0); - } - - public SocketTextStreamFunction(String hostname, int port, char delimiter, int maxRetry) { + public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { this.hostname = hostname; this.port = port; this.delimiter = delimiter; this.maxRetry = maxRetry; + this.retryForever = maxRetry < 0; } @Override @@ -73,17 +72,19 @@ public class SocketTextStreamFunction extends RichSourceFunction { int data = reader.read(); if (data == -1) { socket.close(); - int retry = 0; + long retry = 0; boolean success = false; while (retry < maxRetry && !success) { - retry++; - LOG.warn("Lost connection to server socket. Retrying in 5 seconds..."); + if (!retryForever) { + retry++; + } + LOG.warn("Lost connection to server socket. Retrying in " + (CONNECTION_RETRY_SLEEP / 1000) + " seconds..."); try { socket = new Socket(); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); success = true; } catch (ConnectException ce) { - Thread.sleep(5000); + Thread.sleep(CONNECTION_RETRY_SLEEP); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java index 9be7de6dcbd..792c5d2d58e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java @@ -63,7 +63,7 @@ public class SourceTest { when(socket.isClosed()).thenReturn(false); when(socket.isConnected()).thenReturn(true); - new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector( + new SocketTextStreamFunction("", 0, '\n', 0).streamFromSocket(new MockCollector( actualList), socket); assertEquals(expectedList, actualList); } -- GitLab