未验证 提交 eb199f78 编写于 作者: H He Wang 提交者: GitHub

fix & optimize ClientStream process (#37)

* optimize process thread

* revert checkpoint update
上级 aa5a0783
...@@ -32,11 +32,12 @@ public class ClientStream { ...@@ -32,11 +32,12 @@ public class ClientStream {
/** Flag of whether the stream is started. */ /** Flag of whether the stream is started. */
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
/** The process thread. */ /** The process thread. */
private Thread thread = null; private Thread thread = null;
/** Context of stream. */ /** Context of stream. */
private StreamContext context; private final StreamContext context;
/** Checkpoint string used to resume writing into the queue. */ /** Checkpoint string used to resume writing into the queue. */
private String checkpointString; private String checkpointString;
...@@ -79,10 +80,10 @@ public class ClientStream { ...@@ -79,10 +80,10 @@ public class ClientStream {
this.context = new StreamContext(this, clientConf, connectionParams); this.context = new StreamContext(this, clientConf, connectionParams);
} }
/** Close and wait the connection. */ /** Close the connection and wait the process thread. */
public void stop() { public void stop() {
if (!started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
logger.info("stopping LogProxy Client...."); logger.info("Try to stop this client");
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
...@@ -91,8 +92,8 @@ public class ClientStream { ...@@ -91,8 +92,8 @@ public class ClientStream {
join(); join();
thread = null; thread = null;
logger.info("Client stopped successfully");
} }
logger.info("stopped LogProxy Client");
} }
/** Call {@link Thread#join()} method of process thread. */ /** Call {@link Thread#join()} method of process thread. */
...@@ -101,8 +102,8 @@ public class ClientStream { ...@@ -101,8 +102,8 @@ public class ClientStream {
try { try {
thread.join(); thread.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.warn("ClientStream thread is interrupted: " + e.getMessage()); logger.warn("Waits for process thread failed : {}", e.getMessage());
stop(); triggerStop();
} }
} }
} }
...@@ -140,11 +141,10 @@ public class ClientStream { ...@@ -140,11 +141,10 @@ public class ClientStream {
while (isRunning()) { while (isRunning()) {
ReconnectState state = reconnect(); ReconnectState state = reconnect();
if (state == ReconnectState.EXIT) { if (state == ReconnectState.EXIT) {
logger.error("read thread to exit");
triggerException( triggerException(
new LogProxyClientException( new LogProxyClientException(
ErrorCode.E_MAX_RECONNECT, ErrorCode.E_MAX_RECONNECT,
"exceed max reconnect retry")); "Exceed max retry times"));
break; break;
} }
if (state == ReconnectState.RETRY) { if (state == ReconnectState.RETRY) {
...@@ -157,8 +157,8 @@ public class ClientStream { ...@@ -157,8 +157,8 @@ public class ClientStream {
continue; continue;
} }
StreamContext.TransferPacket packet; StreamContext.TransferPacket packet = null;
while (true) { while (isRunning()) {
try { try {
packet = packet =
context.recordQueue() context.recordQueue()
...@@ -193,33 +193,24 @@ public class ClientStream { ...@@ -193,33 +193,24 @@ public class ClientStream {
+ packet.getType()); + packet.getType());
} }
} catch (LogProxyClientException e) { } catch (LogProxyClientException e) {
triggerStop();
triggerException(e); triggerException(e);
return; break;
} catch (Exception e) { } catch (Exception e) {
// if exception occurred, we exit
triggerStop();
triggerException( triggerException(
new LogProxyClientException(ErrorCode.E_USER, e)); new LogProxyClientException(ErrorCode.E_USER, e));
return; break;
} }
} }
started.set(false); triggerStop();
if (connection != null) { logger.info("Client process thread exit");
connection.close();
}
thread = null;
// TODO... if exception occurred, run handler callback
logger.warn("!!! read thread exit !!!");
}); });
thread.setDaemon(false); thread.setDaemon(false);
thread.start(); thread.start();
} }
// add a shutdown hook to trigger the stop the process
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
} }
/** /**
...@@ -239,15 +230,14 @@ public class ClientStream { ...@@ -239,15 +230,14 @@ public class ClientStream {
private ReconnectState reconnect() { private ReconnectState reconnect() {
// reconnect flag mark, tiny load for checking // reconnect flag mark, tiny load for checking
if (reconnect.compareAndSet(true, false)) { if (reconnect.compareAndSet(true, false)) {
logger.warn("start to reconnect..."); logger.info("Try to reconnect");
try { try {
if (context.config().getMaxReconnectTimes() != -1 if (context.config().getMaxReconnectTimes() != -1
&& retryTimes >= context.config().getMaxReconnectTimes()) { && retryTimes >= context.config().getMaxReconnectTimes()) {
logger.error( logger.error(
"failed to reconnect, exceed max reconnect retry time: {}", "Failed to reconnect, exceed max reconnect retry time: {}",
context.config().getMaxReconnectTimes()); context.config().getMaxReconnectTimes());
reconnect.set(true);
return ReconnectState.EXIT; return ReconnectState.EXIT;
} }
...@@ -261,32 +251,30 @@ public class ClientStream { ...@@ -261,32 +251,30 @@ public class ClientStream {
logger.warn("reconnect set checkpoint: {}", checkpointString); logger.warn("reconnect set checkpoint: {}", checkpointString);
context.params().updateCheckpoint(checkpointString); context.params().updateCheckpoint(checkpointString);
} }
connection = ConnectionFactory.instance().createConnection(context); connection = ConnectionFactory.instance().createConnection(context);
if (connection != null) { if (connection != null) {
logger.warn("reconnect SUCC"); logger.info("Reconnect successfully");
retryTimes = 0; retryTimes = 0;
reconnect.compareAndSet(true, false);
return ReconnectState.SUCCESS; return ReconnectState.SUCCESS;
} }
logger.error( logger.error(
"failed to reconnect, retry count: {}, max: {}", "Failed to reconnect, retry count: {}, max: {}",
++retryTimes, ++retryTimes,
context.config().getMaxReconnectTimes()); context.config().getMaxReconnectTimes());
// not success, retry next time // not success, retry next time
reconnect.set(true); reconnect.set(true);
return ReconnectState.RETRY; return ReconnectState.RETRY;
} catch (Exception e) { } catch (Exception e) {
logger.error( logger.error(
"failed to reconnect, retry count: {}, max: {}, message: {}", "Failed to reconnect, retry count: {}, max: {}, message: {}",
++retryTimes, ++retryTimes,
context.config().getMaxReconnectTimes(), context.config().getMaxReconnectTimes(),
e); e);
// not success, retry next time // not success, retry next time
reconnect.set(true); reconnect.set(true);
return ReconnectState.RETRY; return ReconnectState.RETRY;
} finally { } finally {
reconnecting.set(false); reconnecting.set(false);
} }
......
...@@ -50,10 +50,7 @@ public class LogProxyClientTest { ...@@ -50,10 +50,7 @@ public class LogProxyClientTest {
@Override @Override
public void onException(LogProxyClientException e) { public void onException(LogProxyClientException e) {
if (e.needStop()) { logger.error(e.getMessage());
logger.error(e.getMessage());
client.stop();
}
} }
}); });
client.start(); client.start();
...@@ -83,10 +80,7 @@ public class LogProxyClientTest { ...@@ -83,10 +80,7 @@ public class LogProxyClientTest {
@Override @Override
public void onException(LogProxyClientException e) { public void onException(LogProxyClientException e) {
if (e.needStop()) { logger.error(e.getMessage());
logger.error(e.getMessage());
client.stop();
}
} }
}); });
client.start(); client.start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册