提交 177168b2 编写于 作者: E erli ding 提交者: zentol

[FLINK-2125][streaming] Delimiter change from char to string

This closes #2233
上级 588830aa
......@@ -59,7 +59,7 @@ public class SocketWindowWordCount {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, '\n');
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
......
......@@ -1105,10 +1105,37 @@ public abstract class StreamExecutionEnvironment {
* a negative value ensures retrying forever.
* @return A data stream containing the strings received from the socket
*/
@PublicEvolving
@Deprecated
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
return socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set. On the termination of the socket server connection retries can be
* initiated.
* <p>
* Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
* the socket was gracefully terminated.
*
* @param hostname
* The host name which a server socket binds
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* @param delimiter
* A string which splits received strings into records
* @param maxRetry
* The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
* Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
* while
* a negative value ensures retrying forever.
* @return A data stream containing the strings received from the socket
*/
@PublicEvolving
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
"Socket Stream");
}
/**
......@@ -1124,14 +1151,32 @@ public abstract class StreamExecutionEnvironment {
* A character which splits received strings into records
* @return A data stream containing the strings received from the socket
*/
@PublicEvolving
@Deprecated
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set, using'\n' as delimiter. The reader is terminated immediately when
* decoded by the system's default character set. The reader is terminated immediately when the socket is down.
*
* @param hostname
* The host name which a server socket binds
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* @param delimiter
* A string which splits received strings into records
* @return A data stream containing the strings received from the socket
*/
@PublicEvolving
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set, using"\n" as delimiter. The reader is terminated immediately when
* the socket is down.
*
* @param hostname
......@@ -1143,7 +1188,7 @@ public abstract class StreamExecutionEnvironment {
*/
@PublicEvolving
public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, '\n');
return socketTextStream(hostname, port, "\n");
}
/**
......
......@@ -57,7 +57,7 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
private final String hostname;
private final int port;
private final char delimiter;
private final String delimiter;
private final long maxNumRetries;
private final long delayBetweenRetries;
......@@ -66,11 +66,11 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
private volatile boolean isRunning = true;
public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries) {
public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) {
this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
}
public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries, long delayBetweenRetries) {
checkArgument(port > 0 && port < 65536, "port is out of range");
checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
......@@ -96,19 +96,19 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
int data;
while (isRunning && (data = reader.read()) != -1) {
// check if the string is complete
if (data != delimiter) {
buffer.append((char) data);
}
else {
char[] cbuf = new char[8192];
int bytesRead;
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
// truncate trailing carriage return
if (delimiter == '\n' && buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
buffer.setLength(buffer.length() - 1);
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
ctx.collect(buffer.toString());
buffer.setLength(0);
ctx.collect(record);
buffer.delete(0, delimPos + delimiter.length());
}
}
}
......
......@@ -45,7 +45,7 @@ public class SocketTextStreamFunctionTest {
Socket channel = null;
try {
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0);
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 0);
SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
runner.start();
......@@ -79,7 +79,7 @@ public class SocketTextStreamFunctionTest {
Socket channel = null;
try {
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0);
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 0);
SocketSourceThread runner = new SocketSourceThread(source);
runner.start();
......@@ -108,7 +108,7 @@ public class SocketTextStreamFunctionTest {
Socket channel = null;
try {
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100);
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 10, 100);
SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
runner.start();
......@@ -152,7 +152,7 @@ public class SocketTextStreamFunctionTest {
Socket channel = null;
try {
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', -1, 100);
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", -1, 100);
SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
runner.start();
......@@ -196,7 +196,7 @@ public class SocketTextStreamFunctionTest {
Socket channel = null;
try {
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100);
SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 10, 100);
SocketSourceThread runner = new SocketSourceThread(source, "test1", "check1", "check2");
runner.start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册