提交 18004343 编写于 作者: H HuangWHWHW 提交者: Maximilian Michels

[FLINK-2490][streaming] fix retryForever check in SocketStreamFunction

This closes #992.
上级 e96e5c0b
......@@ -42,7 +42,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
static int CONNECTION_RETRY_SLEEP = 1000;
protected long retries;
private volatile boolean isRunning;
......@@ -67,9 +68,9 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
streamFromSocket(ctx, socket);
}
public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
private void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
try {
StringBuffer buffer = new StringBuffer();
StringBuilder buffer = new StringBuilder();
BufferedReader reader = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
......@@ -87,11 +88,11 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
if (data == -1) {
socket.close();
long retry = 0;
boolean success = false;
while (retry < maxRetry && !success) {
retries = 0;
while ((retries < maxRetry || retryForever) && !success) {
if (!retryForever) {
retry++;
retries++;
}
LOG.warn("Lost connection to server socket. Retrying in "
+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
......@@ -118,7 +119,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
if (data == delimiter) {
ctx.collect(buffer.toString());
buffer = new StringBuffer();
buffer = new StringBuilder();
} else if (data != '\r') { // ignore carriage return
buffer.append((char) data);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions;
import java.io.IOException;
import java.net.Socket;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.util.concurrent.atomic.AtomicReference;
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
*/
public class SocketClientSinkTest{
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
private final String host = "127.0.0.1";
private int port;
private String access;
private String value;
public SocketServer.ServerThread th;
public SocketClientSinkTest() {
}
class SocketServer extends Thread {
private ServerSocket server;
private Socket sk;
private BufferedReader rdr;
private SocketServer() {
try {
this.server = new ServerSocket(0);
port = server.getLocalPort();
} catch (Exception e) {
error.set(e);
}
}
public void run() {
try {
sk = server.accept();
access = "Connected";
th = new ServerThread(sk);
th.start();
} catch (Exception e) {
error.set(e);
}
}
class ServerThread extends Thread {
Socket sk;
public ServerThread(Socket sk) {
this.sk = sk;
}
public void run() {
try {
rdr = new BufferedReader(new InputStreamReader(sk
.getInputStream()));
value = rdr.readLine();
} catch (IOException e) {
error.set(e);
}
}
}
}
@Test
public void testSocketSink() throws Exception{
SocketServer server = new SocketServer();
server.start();
SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
@Override
public byte[] serialize(String element) {
return element.getBytes();
}
};
SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema);
simpleSink.open(new Configuration());
simpleSink.invoke("testSocketSinkInvoke");
simpleSink.close();
server.join();
th.join();
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
assertEquals("Connected", this.access);
assertEquals("testSocketSinkInvoke", value);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.source;
import java.io.DataOutputStream;
import java.net.Socket;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static java.lang.Thread.sleep;
import static org.junit.Assert.*;
import static org.mockito.Mockito.verify;
import java.net.ServerSocket;
import java.util.concurrent.atomic.AtomicReference;
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
*/
public class SocketTextStreamFunctionTest{
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
private final String host = "127.0.0.1";
private final SourceFunction.SourceContext<String> ctx = Mockito.mock(SourceFunction.SourceContext.class);
public SocketTextStreamFunctionTest() {
}
class SocketSource extends Thread {
SocketTextStreamFunction socketSource;
public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception {
this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
}
public void run() {
try {
this.socketSource.open(new Configuration());
this.socketSource.run(ctx);
}catch(Exception e){
error.set(e);
}
}
public void cancel(){
this.socketSource.cancel();
}
}
@Test
public void testSocketSourceRetryForever() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, -1);
source.start();
int count = 0;
Socket channel;
while (count < 100) {
channel = serverSo.accept();
count++;
channel.close();
assertEquals(0, source.socketSource.retries);
}
source.cancel();
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
assertEquals(100, count);
}
@Test
public void testSocketSourceRetryTenTimes() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, 10);
source.socketSource.CONNECTION_RETRY_SLEEP = 200;
assertEquals(0, source.socketSource.retries);
source.start();
Socket channel;
channel = serverSo.accept();
channel.close();
serverSo.close();
while(source.socketSource.retries < 10){
long lastRetry = source.socketSource.retries;
sleep(100);
assertTrue(source.socketSource.retries >= lastRetry);
};
assertEquals(10, source.socketSource.retries);
source.cancel();
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
assertEquals(10, source.socketSource.retries);
}
@Test
public void testSocketSourceNeverRetry() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, 0);
source.start();
Socket channel;
channel = serverSo.accept();
channel.close();
serverSo.close();
sleep(2000);
source.cancel();
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
assertEquals(0, source.socketSource.retries);
}
@Test
public void testSocketSourceRetryTenTimesWithFirstPass() throws Exception{
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, 10);
source.socketSource.CONNECTION_RETRY_SLEEP = 200;
assertEquals(0, source.socketSource.retries);
source.start();
Socket channel;
channel = serverSo.accept();
DataOutputStream dataOutputStream = new DataOutputStream(channel.getOutputStream());
dataOutputStream.write("testFirstSocketpass\n".getBytes());
channel.close();
serverSo.close();
while(source.socketSource.retries < 10){
long lastRetry = source.socketSource.retries;
sleep(100);
assertTrue(source.socketSource.retries >= lastRetry);
};
assertEquals(10, source.socketSource.retries);
source.cancel();
verify(ctx).collect(argument.capture());
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
assertEquals("testFirstSocketpass", argument.getValue());
assertEquals(10, source.socketSource.retries);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册