提交 d182d042 编写于 作者: M mbalassi

[FLINK-1582][streaming] SocketStream minor enhancements

This closes #424
上级 fd65a241
...@@ -374,7 +374,12 @@ public abstract class StreamExecutionEnvironment { ...@@ -374,7 +374,12 @@ public abstract class StreamExecutionEnvironment {
/** /**
* Creates a new DataStream that contains the strings received infinitely * Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default * from socket. Received strings are decoded by the system's default
* character set. * character set. On the termination of the socket server connection retries
* can be initiated.
*
* <p>Let us note that the socket itself does not report on abort and
* as a consequence retries are only initiated when the socket was gracefully
* terminated.</p>
* *
* @param hostname * @param hostname
* The host name which a server socket bind. * The host name which a server socket bind.
...@@ -384,12 +389,14 @@ public abstract class StreamExecutionEnvironment { ...@@ -384,12 +389,14 @@ public abstract class StreamExecutionEnvironment {
* @param delimiter * @param delimiter
* A character which split received strings into records. * A character which split received strings into records.
* @param maxRetry * @param maxRetry
* The maximal retry number when the socket is down. Reconnection is * The maximal retry interval in seconds while the program waits for
* tried in every 5 seconds. A number of 0 means that the reader * a socket that is temporarily down. Reconnection is initiated every
* is immediately terminated. * second. A number of 0 means that the reader is immediately
* terminated, while a negative value ensures retrying forever.
* @return A DataStream, containing the strings received from socket. * @return A DataStream, containing the strings received from socket.
*
*/ */
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, int maxRetry) { public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null, return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null,
"Socket Stream"); "Socket Stream");
} }
......
...@@ -37,19 +37,18 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> { ...@@ -37,19 +37,18 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private String hostname; private String hostname;
private int port; private int port;
private char delimiter; private char delimiter;
private int maxRetry; private long maxRetry;
private boolean retryForever;
private Socket socket; private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
public SocketTextStreamFunction(String hostname, int port, char delimiter) { public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
this(hostname, port, delimiter, 0);
}
public SocketTextStreamFunction(String hostname, int port, char delimiter, int maxRetry) {
this.hostname = hostname; this.hostname = hostname;
this.port = port; this.port = port;
this.delimiter = delimiter; this.delimiter = delimiter;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
this.retryForever = maxRetry < 0;
} }
@Override @Override
...@@ -73,17 +72,19 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> { ...@@ -73,17 +72,19 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
int data = reader.read(); int data = reader.read();
if (data == -1) { if (data == -1) {
socket.close(); socket.close();
int retry = 0; long retry = 0;
boolean success = false; boolean success = false;
while (retry < maxRetry && !success) { while (retry < maxRetry && !success) {
if (!retryForever) {
retry++; retry++;
LOG.warn("Lost connection to server socket. Retrying in 5 seconds..."); }
LOG.warn("Lost connection to server socket. Retrying in " + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
try { try {
socket = new Socket(); socket = new Socket();
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
success = true; success = true;
} catch (ConnectException ce) { } catch (ConnectException ce) {
Thread.sleep(5000); Thread.sleep(CONNECTION_RETRY_SLEEP);
} }
} }
......
...@@ -63,7 +63,7 @@ public class SourceTest { ...@@ -63,7 +63,7 @@ public class SourceTest {
when(socket.isClosed()).thenReturn(false); when(socket.isClosed()).thenReturn(false);
when(socket.isConnected()).thenReturn(true); when(socket.isConnected()).thenReturn(true);
new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>( new SocketTextStreamFunction("", 0, '\n', 0).streamFromSocket(new MockCollector<String>(
actualList), socket); actualList), socket);
assertEquals(expectedList, actualList); assertEquals(expectedList, actualList);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册