diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java index 105e7d9a5cd28b664a6baa8290ff8265d877f3ee..10e8ca0a536809c844c338ccc815778fe6bec5af 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java @@ -59,7 +59,7 @@ public class SocketWindowWordCount { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket - DataStream text = env.socketTextStream("localhost", port, '\n'); + DataStream text = env.socketTextStream("localhost", port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream windowCounts = text diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 53053b96a167a128627eb9e84a3545cec381cee8..cd3a0c35d153b8fd29f74d310856eb745b1082ae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1105,10 +1105,37 @@ public abstract class StreamExecutionEnvironment { * a negative value ensures retrying forever. * @return A data stream containing the strings received from the socket */ - @PublicEvolving + @Deprecated public DataStreamSource socketTextStream(String hostname, int port, char delimiter, long maxRetry) { + return socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry); + } + + /** + * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are + * decoded by the system's default character set. On the termination of the socket server connection retries can be + * initiated. + *

+ * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when + * the socket was gracefully terminated. + * + * @param hostname + * The host name which a server socket binds + * @param port + * The port number which a server socket binds. A port number of 0 means that the port number is automatically + * allocated. + * @param delimiter + * A string which splits received strings into records + * @param maxRetry + * The maximal retry interval in seconds while the program waits for a socket that is temporarily down. + * Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated, + * while + * a negative value ensures retrying forever. + * @return A data stream containing the strings received from the socket + */ + @PublicEvolving + public DataStreamSource socketTextStream(String hostname, int port, String delimiter, long maxRetry) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), - "Socket Stream"); + "Socket Stream"); } /** @@ -1124,14 +1151,32 @@ public abstract class StreamExecutionEnvironment { * A character which splits received strings into records * @return A data stream containing the strings received from the socket */ - @PublicEvolving + @Deprecated public DataStreamSource socketTextStream(String hostname, int port, char delimiter) { return socketTextStream(hostname, port, delimiter, 0); } /** * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are - * decoded by the system's default character set, using'\n' as delimiter. The reader is terminated immediately when + * decoded by the system's default character set. The reader is terminated immediately when the socket is down. + * + * @param hostname + * The host name which a server socket binds + * @param port + * The port number which a server socket binds. A port number of 0 means that the port number is automatically + * allocated. + * @param delimiter + * A string which splits received strings into records + * @return A data stream containing the strings received from the socket + */ + @PublicEvolving + public DataStreamSource socketTextStream(String hostname, int port, String delimiter) { + return socketTextStream(hostname, port, delimiter, 0); + } + + /** + * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are + * decoded by the system's default character set, using"\n" as delimiter. The reader is terminated immediately when * the socket is down. * * @param hostname @@ -1143,7 +1188,7 @@ public abstract class StreamExecutionEnvironment { */ @PublicEvolving public DataStreamSource socketTextStream(String hostname, int port) { - return socketTextStream(hostname, port, '\n'); + return socketTextStream(hostname, port, "\n"); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 3ae54b661440c17e3c622d459ce2dad79ab442d0..d1fea1e0be1233f2c7f146aeb4e3032031e23287 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -57,7 +57,7 @@ public class SocketTextStreamFunction implements SourceFunction { private final String hostname; private final int port; - private final char delimiter; + private final String delimiter; private final long maxNumRetries; private final long delayBetweenRetries; @@ -66,11 +66,11 @@ public class SocketTextStreamFunction implements SourceFunction { private volatile boolean isRunning = true; - public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries) { + public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) { this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); } - public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) { + public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries, long delayBetweenRetries) { checkArgument(port > 0 && port < 65536, "port is out of range"); checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive"); @@ -96,19 +96,19 @@ public class SocketTextStreamFunction implements SourceFunction { socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - int data; - while (isRunning && (data = reader.read()) != -1) { - // check if the string is complete - if (data != delimiter) { - buffer.append((char) data); - } - else { + char[] cbuf = new char[8192]; + int bytesRead; + while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { + buffer.append(cbuf, 0, bytesRead); + int delimPos; + while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { + String record = buffer.substring(0, delimPos); // truncate trailing carriage return - if (delimiter == '\n' && buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') { - buffer.setLength(buffer.length() - 1); + if (delimiter.equals("\n") && record.endsWith("\r")) { + record = record.substring(0, record.length() - 1); } - ctx.collect(buffer.toString()); - buffer.setLength(0); + ctx.collect(record); + buffer.delete(0, delimPos + delimiter.length()); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java index 33984516dd19eecfa95aa496f45efe0916a4705e..3e274cf782608bf74c97183edc34efd1b69cc614 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java @@ -45,7 +45,7 @@ public class SocketTextStreamFunctionTest { Socket channel = null; try { - SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0); + SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 0); SocketSourceThread runner = new SocketSourceThread(source, "test1", "check"); runner.start(); @@ -79,7 +79,7 @@ public class SocketTextStreamFunctionTest { Socket channel = null; try { - SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0); + SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 0); SocketSourceThread runner = new SocketSourceThread(source); runner.start(); @@ -108,7 +108,7 @@ public class SocketTextStreamFunctionTest { Socket channel = null; try { - SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100); + SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 10, 100); SocketSourceThread runner = new SocketSourceThread(source, "test1", "check"); runner.start(); @@ -152,7 +152,7 @@ public class SocketTextStreamFunctionTest { Socket channel = null; try { - SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', -1, 100); + SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", -1, 100); SocketSourceThread runner = new SocketSourceThread(source, "test1", "check"); runner.start(); @@ -196,7 +196,7 @@ public class SocketTextStreamFunctionTest { Socket channel = null; try { - SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100); + SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 10, 100); SocketSourceThread runner = new SocketSourceThread(source, "test1", "check1", "check2"); runner.start();