From eb199f78ec74875e83fe584ce77e218686351df1 Mon Sep 17 00:00:00 2001 From: He Wang Date: Fri, 20 May 2022 10:50:42 +0800 Subject: [PATCH] fix & optimize ClientStream process (#37) * optimize process thread * revert checkpoint update --- .../client/connection/ClientStream.java | 58 ++++++++----------- .../clogproxy/client/LogProxyClientTest.java | 10 +--- 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index e9fc042..15f3d53 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -32,11 +32,12 @@ public class ClientStream { /** Flag of whether the stream is started. */ private final AtomicBoolean started = new AtomicBoolean(false); + /** The process thread. */ private Thread thread = null; /** Context of stream. */ - private StreamContext context; + private final StreamContext context; /** Checkpoint string used to resume writing into the queue. */ private String checkpointString; @@ -79,10 +80,10 @@ public class ClientStream { this.context = new StreamContext(this, clientConf, connectionParams); } - /** Close and wait the connection. */ + /** Close the connection and wait the process thread. */ public void stop() { - if (!started.compareAndSet(true, false)) { - logger.info("stopping LogProxy Client...."); + if (started.compareAndSet(true, false)) { + logger.info("Try to stop this client"); if (connection != null) { connection.close(); @@ -91,8 +92,8 @@ public class ClientStream { join(); thread = null; + logger.info("Client stopped successfully"); } - logger.info("stopped LogProxy Client"); } /** Call {@link Thread#join()} method of process thread. */ @@ -101,8 +102,8 @@ public class ClientStream { try { thread.join(); } catch (InterruptedException e) { - logger.warn("ClientStream thread is interrupted: " + e.getMessage()); - stop(); + logger.warn("Waits for process thread failed : {}", e.getMessage()); + triggerStop(); } } } @@ -140,11 +141,10 @@ public class ClientStream { while (isRunning()) { ReconnectState state = reconnect(); if (state == ReconnectState.EXIT) { - logger.error("read thread to exit"); triggerException( new LogProxyClientException( ErrorCode.E_MAX_RECONNECT, - "exceed max reconnect retry")); + "Exceed max retry times")); break; } if (state == ReconnectState.RETRY) { @@ -157,8 +157,8 @@ public class ClientStream { continue; } - StreamContext.TransferPacket packet; - while (true) { + StreamContext.TransferPacket packet = null; + while (isRunning()) { try { packet = context.recordQueue() @@ -193,33 +193,24 @@ public class ClientStream { + packet.getType()); } } catch (LogProxyClientException e) { - triggerStop(); triggerException(e); - return; - + break; } catch (Exception e) { - // if exception occurred, we exit - triggerStop(); triggerException( new LogProxyClientException(ErrorCode.E_USER, e)); - return; + break; } } - started.set(false); - if (connection != null) { - connection.close(); - } - thread = null; - - // TODO... if exception occurred, run handler callback - - logger.warn("!!! read thread exit !!!"); + triggerStop(); + logger.info("Client process thread exit"); }); thread.setDaemon(false); thread.start(); } + // add a shutdown hook to trigger the stop the process + Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); } /** @@ -239,15 +230,14 @@ public class ClientStream { private ReconnectState reconnect() { // reconnect flag mark, tiny load for checking if (reconnect.compareAndSet(true, false)) { - logger.warn("start to reconnect..."); + logger.info("Try to reconnect"); try { if (context.config().getMaxReconnectTimes() != -1 && retryTimes >= context.config().getMaxReconnectTimes()) { logger.error( - "failed to reconnect, exceed max reconnect retry time: {}", + "Failed to reconnect, exceed max reconnect retry time: {}", context.config().getMaxReconnectTimes()); - reconnect.set(true); return ReconnectState.EXIT; } @@ -261,32 +251,30 @@ public class ClientStream { logger.warn("reconnect set checkpoint: {}", checkpointString); context.params().updateCheckpoint(checkpointString); } + connection = ConnectionFactory.instance().createConnection(context); if (connection != null) { - logger.warn("reconnect SUCC"); + logger.info("Reconnect successfully"); retryTimes = 0; - reconnect.compareAndSet(true, false); return ReconnectState.SUCCESS; } logger.error( - "failed to reconnect, retry count: {}, max: {}", + "Failed to reconnect, retry count: {}, max: {}", ++retryTimes, context.config().getMaxReconnectTimes()); // not success, retry next time reconnect.set(true); return ReconnectState.RETRY; - } catch (Exception e) { logger.error( - "failed to reconnect, retry count: {}, max: {}, message: {}", + "Failed to reconnect, retry count: {}, max: {}, message: {}", ++retryTimes, context.config().getMaxReconnectTimes(), e); // not success, retry next time reconnect.set(true); return ReconnectState.RETRY; - } finally { reconnecting.set(false); } diff --git a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java index 323c35b..ce17b2d 100644 --- a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java +++ b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java @@ -50,10 +50,7 @@ public class LogProxyClientTest { @Override public void onException(LogProxyClientException e) { - if (e.needStop()) { - logger.error(e.getMessage()); - client.stop(); - } + logger.error(e.getMessage()); } }); client.start(); @@ -83,10 +80,7 @@ public class LogProxyClientTest { @Override public void onException(LogProxyClientException e) { - if (e.needStop()) { - logger.error(e.getMessage()); - client.stop(); - } + logger.error(e.getMessage()); } }); client.start(); -- GitLab