提交 e845ee02 编写于 作者: wu-sheng's avatar wu-sheng

add network max-wait-time

上级 809e3d7b
......@@ -26,7 +26,7 @@ public class Agent2RoutingClient extends Thread {
private static ILog logger = LogManager.getLogger(Agent2RoutingClient.class);
private List<ServerAddr> addrList;
private Client client;
private Client client;
private SpanStorageClient spanStorageClient;
private NetworkListener listener;
private SendRequestSpanEventHandler requestSpanDataSupplier = null;
......@@ -43,7 +43,7 @@ public class Agent2RoutingClient extends Thread {
if (addrSegments.length != 2) {
throw new IllegalArgumentException("server addr should like ip:port, illegal addr:" + server);
}
addrList.add(new ServerAddr(addrSegments[0], addrSegments[2]));
addrList.add(new ServerAddr(addrSegments[0], addrSegments[1]));
}
listener = new NetworkListener();
}
......@@ -63,7 +63,7 @@ public class Agent2RoutingClient extends Thread {
private void connect() {
try {
if(client != null && !client.isShutdown()){
if (client != null && !client.isShutdown()) {
client.shutdown();
}
int addrIdx = new Random().nextInt(addrList.size());
......@@ -84,26 +84,33 @@ public class Agent2RoutingClient extends Thread {
List<RequestSpan> requestData = this.requestSpanDataSupplier.getBufferData();
List<AckSpan> ackData = this.ackSpanDataSupplier.getBufferData();
if (requestData.size() > 0 || ackData.size() > 0) {
boolean hasData = false;
if (requestData.size() > 0) {
hasData = true;
listener.begin();
spanStorageClient.sendRequestSpan(requestData);
listener.wait2Confirm();
}
if (ackData.size() > 0) {
hasData = true;
listener.begin();
spanStorageClient.sendACKSpan(ackData);
while (!listener.isBatchFinished()) {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
listener.wait2Confirm();
}
}
}
} else {
if(!hasData) {
try {
Thread.sleep(10 * 1000L);
} catch (InterruptedException e) {
}
}
}
try {
......@@ -143,6 +150,21 @@ public class Agent2RoutingClient extends Thread {
HealthCollector.getCurrentHeathReading("Agent2RoutingClient").updateData(HeathReading.INFO, "batch send data to routing node.");
}
void wait2Confirm(){
// wait 20s, most.
int countDown = 100 * 20;
while (!listener.isBatchFinished()) {
try {
Thread.sleep(10L);
if(countDown-- < 0){
batchFinished = true;
}
} catch (InterruptedException e) {
}
}
}
}
......
......@@ -56,9 +56,14 @@ public abstract class AbstractRouteSpanEventHandler<T> implements EventHandler<T
}
public void wait2Finish() {
// wait 20s, most
int countDown = 1000 * 20;
while (!previousSendFinish) {
try {
Thread.sleep(1L);
if(countDown-- < 0){
previousSendFinish = true;
}
} catch (InterruptedException e) {
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册