diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8487fa5715b46450e4a6b9a28841ca4c60ac0e3e..b43ae374afb3ed37b2a8bb6d0b6325e05c8a9757 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -374,7 +374,12 @@ public abstract class StreamExecutionEnvironment {
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
- * character set.
+ * character set. On the termination of the socket server connection retries
+ * can be initiated.
+ *
+ *
Let us note that the socket itself does not report on abort and
+ * as a consequence retries are only initiated when the socket was gracefully
+ * terminated.
*
* @param hostname
* The host name which a server socket bind.
@@ -384,12 +389,14 @@ public abstract class StreamExecutionEnvironment {
* @param delimiter
* A character which split received strings into records.
* @param maxRetry
- * The maximal retry number when the socket is down. Reconnection is
- * tried in every 5 seconds. A number of 0 means that the reader
- * is immediately terminated.
+ * The maximal retry interval in seconds while the program waits for
+ * a socket that is temporarily down. Reconnection is initiated every
+ * second. A number of 0 means that the reader is immediately
+ * terminated, while a negative value ensures retrying forever.
* @return A DataStream, containing the strings received from socket.
+ *
*/
- public DataStreamSource socketTextStream(String hostname, int port, char delimiter, int maxRetry) {
+ public DataStreamSource socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null,
"Socket Stream");
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index d9955222a3adfe0e826e67bebd0dcc4f06e4a1f2..3253c012952e8bf9ae9fcfe9a536503c6972ffe0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -37,19 +37,18 @@ public class SocketTextStreamFunction extends RichSourceFunction {
private String hostname;
private int port;
private char delimiter;
- private int maxRetry;
+ private long maxRetry;
+ private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
+ private static final int CONNECTION_RETRY_SLEEP = 1000;
- public SocketTextStreamFunction(String hostname, int port, char delimiter) {
- this(hostname, port, delimiter, 0);
- }
-
- public SocketTextStreamFunction(String hostname, int port, char delimiter, int maxRetry) {
+ public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
this.hostname = hostname;
this.port = port;
this.delimiter = delimiter;
this.maxRetry = maxRetry;
+ this.retryForever = maxRetry < 0;
}
@Override
@@ -73,17 +72,19 @@ public class SocketTextStreamFunction extends RichSourceFunction {
int data = reader.read();
if (data == -1) {
socket.close();
- int retry = 0;
+ long retry = 0;
boolean success = false;
while (retry < maxRetry && !success) {
- retry++;
- LOG.warn("Lost connection to server socket. Retrying in 5 seconds...");
+ if (!retryForever) {
+ retry++;
+ }
+ LOG.warn("Lost connection to server socket. Retrying in " + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
try {
socket = new Socket();
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
success = true;
} catch (ConnectException ce) {
- Thread.sleep(5000);
+ Thread.sleep(CONNECTION_RETRY_SLEEP);
}
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
index 9be7de6dcbdee386e6ef1c8675f21f137deb3681..792c5d2d58e47baf8c3be0528303d07642357d82 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -63,7 +63,7 @@ public class SourceTest {
when(socket.isClosed()).thenReturn(false);
when(socket.isConnected()).thenReturn(true);
- new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector(
+ new SocketTextStreamFunction("", 0, '\n', 0).streamFromSocket(new MockCollector(
actualList), socket);
assertEquals(expectedList, actualList);
}