...
 
Commits (5)
    https://gitcode.net/oceanbase/oblogclient/-/commit/1a92566f92171dcf06d4c03dc467912d82f2e69d update checkpoint for record queue (#40) 2022-05-19T12:13:28+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/aa5a07830cddb306a49aee66f02af579148f3313 format logger message (#39) 2022-05-19T18:29:11+08:00 He Wang wanghechn@qq.com * format logger message * revert logic change https://gitcode.net/oceanbase/oblogclient/-/commit/eb199f78ec74875e83fe584ce77e218686351df1 fix & optimize ClientStream process (#37) 2022-05-20T10:50:42+08:00 He Wang wanghechn@qq.com * optimize process thread * revert checkpoint update https://gitcode.net/oceanbase/oblogclient/-/commit/64e458601835bc034aff891d81d42d38491a78a1 [maven-release-plugin] prepare release logclient-1.0.4 2022-05-20T11:28:03+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/3f5306d36967dee1a1b31324a5d42d918f2d9482 [maven-release-plugin] prepare for next development iteration 2022-05-20T11:28:12+08:00 He Wang wanghechn@qq.com
...@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details. ...@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent> <parent>
<groupId>com.oceanbase.logclient</groupId> <groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId> <artifactId>logclient</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.0.5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
......
...@@ -941,7 +941,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -941,7 +941,7 @@ public class LogMessage extends DataMessage.Record {
} }
int j = keyStr.indexOf(')', i); int j = keyStr.indexOf(')', i);
if (j == -1) { if (j == -1) {
log.error("parse key error"); log.error("Parse key error");
return null; return null;
} }
m = j; m = j;
...@@ -1077,7 +1077,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -1077,7 +1077,7 @@ public class LogMessage extends DataMessage.Record {
} }
return primaryValues; return primaryValues;
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new LogMessageException(e.getMessage(), e.getCause());
} }
} }
......
...@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details. ...@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent> <parent>
<groupId>com.oceanbase.logclient</groupId> <groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId> <artifactId>logclient</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.0.5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
......
...@@ -132,8 +132,12 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -132,8 +132,12 @@ public class ObReaderConfig extends AbstractConnectionConfig {
+ ", cluster_password=******, " + ", cluster_password=******, "
+ "tb_white_list=" + "tb_white_list="
+ TABLE_WHITE_LIST + TABLE_WHITE_LIST
+ ", tb_black_list="
+ TABLE_BLACK_LIST
+ ", start_timestamp=" + ", start_timestamp="
+ START_TIMESTAMP; + START_TIMESTAMP
+ ", timezone="
+ TIME_ZONE;
} }
/** /**
......
...@@ -226,7 +226,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -226,7 +226,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
byte[] bytes = new byte[dataLength]; byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes); buffer.readBytes(bytes);
LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes); LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes);
logger.debug("server status: {}", response.toString()); logger.debug("Server status: {}", response.toString());
state = HandshakeStateV1.PB_HEAD; state = HandshakeStateV1.PB_HEAD;
} else { } else {
dataNotEnough = true; dataNotEnough = true;
...@@ -252,18 +252,18 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -252,18 +252,18 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
*/ */
private void checkHeader(int version, int type, int length) { private void checkHeader(int version, int type, int length) {
if (ProtocolVersion.codeOf(version) == null) { if (ProtocolVersion.codeOf(version) == null) {
logger.error("unsupported protocol version: {}", version); logger.error("Unsupported protocol version: {}", version);
throw new LogProxyClientException( throw new LogProxyClientException(
ErrorCode.E_PROTOCOL, "unsupported protocol version: " + version); ErrorCode.E_PROTOCOL, "Unsupported protocol version: " + version);
} }
if (HeaderType.codeOf(type) == null) { if (HeaderType.codeOf(type) == null) {
logger.error("unsupported header type: {}", type); logger.error("Unsupported header type: {}", type);
throw new LogProxyClientException( throw new LogProxyClientException(
ErrorCode.E_HEADER_TYPE, "unsupported header type: " + type); ErrorCode.E_HEADER_TYPE, "Unsupported header type: " + type);
} }
if (length <= 0) { if (length <= 0) {
logger.error("data length equals 0"); logger.error("Data length equals 0");
throw new LogProxyClientException(ErrorCode.E_LEN, "data length equals 0"); throw new LogProxyClientException(ErrorCode.E_LEN, "Data length equals 0");
} }
} }
...@@ -334,6 +334,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -334,6 +334,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
while (true) { while (true) {
try { try {
recordQueue.put(new StreamContext.TransferPacket(logMessage)); recordQueue.put(new StreamContext.TransferPacket(logMessage));
stream.setCheckpointString(logMessage.getTimestamp());
break; break;
} catch (InterruptedException e) { } catch (InterruptedException e) {
// do nothing // do nothing
...@@ -452,8 +453,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -452,8 +453,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
poolFlag = false; poolFlag = false;
logger.info( logger.info(
"Connect broken of ClientId: {} with LogProxy: {}", "Channel closed with ClientId: {}, LogProxy: {}",
params.info(), params.getClientId(),
NetworkUtil.parseRemoteAddress(ctx.channel())); NetworkUtil.parseRemoteAddress(ctx.channel()));
ctx.channel().disconnect(); ctx.channel().disconnect();
ctx.close(); ctx.close();
......
...@@ -32,11 +32,12 @@ public class ClientStream { ...@@ -32,11 +32,12 @@ public class ClientStream {
/** Flag of whether the stream is started. */ /** Flag of whether the stream is started. */
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
/** The process thread. */ /** The process thread. */
private Thread thread = null; private Thread thread = null;
/** Context of stream. */ /** Context of stream. */
private StreamContext context; private final StreamContext context;
/** Checkpoint string used to resume writing into the queue. */ /** Checkpoint string used to resume writing into the queue. */
private String checkpointString; private String checkpointString;
...@@ -79,10 +80,10 @@ public class ClientStream { ...@@ -79,10 +80,10 @@ public class ClientStream {
this.context = new StreamContext(this, clientConf, connectionParams); this.context = new StreamContext(this, clientConf, connectionParams);
} }
/** Close and wait the connection. */ /** Close the connection and wait the process thread. */
public void stop() { public void stop() {
if (!started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
logger.info("stopping LogProxy Client...."); logger.info("Try to stop this client");
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
...@@ -91,8 +92,8 @@ public class ClientStream { ...@@ -91,8 +92,8 @@ public class ClientStream {
join(); join();
thread = null; thread = null;
logger.info("Client stopped successfully");
} }
logger.info("stopped LogProxy Client");
} }
/** Call {@link Thread#join()} method of process thread. */ /** Call {@link Thread#join()} method of process thread. */
...@@ -101,8 +102,8 @@ public class ClientStream { ...@@ -101,8 +102,8 @@ public class ClientStream {
try { try {
thread.join(); thread.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.warn("ClientStream thread is interrupted: " + e.getMessage()); logger.warn("Waits for process thread failed : {}", e.getMessage());
stop(); triggerStop();
} }
} }
} }
...@@ -140,11 +141,10 @@ public class ClientStream { ...@@ -140,11 +141,10 @@ public class ClientStream {
while (isRunning()) { while (isRunning()) {
ReconnectState state = reconnect(); ReconnectState state = reconnect();
if (state == ReconnectState.EXIT) { if (state == ReconnectState.EXIT) {
logger.error("read thread to exit");
triggerException( triggerException(
new LogProxyClientException( new LogProxyClientException(
ErrorCode.E_MAX_RECONNECT, ErrorCode.E_MAX_RECONNECT,
"exceed max reconnect retry")); "Exceed max retry times"));
break; break;
} }
if (state == ReconnectState.RETRY) { if (state == ReconnectState.RETRY) {
...@@ -157,8 +157,8 @@ public class ClientStream { ...@@ -157,8 +157,8 @@ public class ClientStream {
continue; continue;
} }
StreamContext.TransferPacket packet; StreamContext.TransferPacket packet = null;
while (true) { while (isRunning()) {
try { try {
packet = packet =
context.recordQueue() context.recordQueue()
...@@ -193,33 +193,24 @@ public class ClientStream { ...@@ -193,33 +193,24 @@ public class ClientStream {
+ packet.getType()); + packet.getType());
} }
} catch (LogProxyClientException e) { } catch (LogProxyClientException e) {
triggerStop();
triggerException(e); triggerException(e);
return; break;
} catch (Exception e) { } catch (Exception e) {
// if exception occurred, we exit
triggerStop();
triggerException( triggerException(
new LogProxyClientException(ErrorCode.E_USER, e)); new LogProxyClientException(ErrorCode.E_USER, e));
return; break;
} }
} }
started.set(false); triggerStop();
if (connection != null) { logger.info("Client process thread exit");
connection.close();
}
thread = null;
// TODO... if exception occurred, run handler callback
logger.warn("!!! read thread exit !!!");
}); });
thread.setDaemon(false); thread.setDaemon(false);
thread.start(); thread.start();
} }
// add a shutdown hook to trigger the stop the process
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
} }
/** /**
...@@ -239,15 +230,14 @@ public class ClientStream { ...@@ -239,15 +230,14 @@ public class ClientStream {
private ReconnectState reconnect() { private ReconnectState reconnect() {
// reconnect flag mark, tiny load for checking // reconnect flag mark, tiny load for checking
if (reconnect.compareAndSet(true, false)) { if (reconnect.compareAndSet(true, false)) {
logger.warn("start to reconnect..."); logger.info("Try to reconnect");
try { try {
if (context.config().getMaxReconnectTimes() != -1 if (context.config().getMaxReconnectTimes() != -1
&& retryTimes >= context.config().getMaxReconnectTimes()) { && retryTimes >= context.config().getMaxReconnectTimes()) {
logger.error( logger.error(
"failed to reconnect, exceed max reconnect retry time: {}", "Failed to reconnect, exceed max reconnect retry time: {}",
context.config().getMaxReconnectTimes()); context.config().getMaxReconnectTimes());
reconnect.set(true);
return ReconnectState.EXIT; return ReconnectState.EXIT;
} }
...@@ -261,32 +251,30 @@ public class ClientStream { ...@@ -261,32 +251,30 @@ public class ClientStream {
logger.warn("reconnect set checkpoint: {}", checkpointString); logger.warn("reconnect set checkpoint: {}", checkpointString);
context.params().updateCheckpoint(checkpointString); context.params().updateCheckpoint(checkpointString);
} }
connection = ConnectionFactory.instance().createConnection(context); connection = ConnectionFactory.instance().createConnection(context);
if (connection != null) { if (connection != null) {
logger.warn("reconnect SUCC"); logger.info("Reconnect successfully");
retryTimes = 0; retryTimes = 0;
reconnect.compareAndSet(true, false);
return ReconnectState.SUCCESS; return ReconnectState.SUCCESS;
} }
logger.error( logger.error(
"failed to reconnect, retry count: {}, max: {}", "Failed to reconnect, retry count: {}, max: {}",
++retryTimes, ++retryTimes,
context.config().getMaxReconnectTimes()); context.config().getMaxReconnectTimes());
// not success, retry next time // not success, retry next time
reconnect.set(true); reconnect.set(true);
return ReconnectState.RETRY; return ReconnectState.RETRY;
} catch (Exception e) { } catch (Exception e) {
logger.error( logger.error(
"failed to reconnect, retry count: {}, max: {}, message: {}", "Failed to reconnect, retry count: {}, max: {}, message: {}",
++retryTimes, ++retryTimes,
context.config().getMaxReconnectTimes(), context.config().getMaxReconnectTimes(),
e); e);
// not success, retry next time // not success, retry next time
reconnect.set(true); reconnect.set(true);
return ReconnectState.RETRY; return ReconnectState.RETRY;
} finally { } finally {
reconnecting.set(false); reconnecting.set(false);
} }
...@@ -302,6 +290,15 @@ public class ClientStream { ...@@ -302,6 +290,15 @@ public class ClientStream {
} }
} }
/**
* Set checkpoint string.
*
* @param checkpointString Checkpoint string.
*/
public void setCheckpointString(String checkpointString) {
this.checkpointString = checkpointString;
}
/** /**
* Add a {@link RecordListener} to {@link #listeners}. * Add a {@link RecordListener} to {@link #listeners}.
* *
......
...@@ -41,7 +41,7 @@ public class Connection { ...@@ -41,7 +41,7 @@ public class Connection {
/** Close this connection. */ /** Close this connection. */
public void close() { public void close() {
if (!closed.compareAndSet(false, true)) { if (!closed.compareAndSet(false, true)) {
logger.warn("connection already closed"); logger.warn("Connection already closed");
} }
if (channel != null) { if (channel != null) {
if (channel.isActive()) { if (channel.isActive()) {
...@@ -49,7 +49,7 @@ public class Connection { ...@@ -49,7 +49,7 @@ public class Connection {
channel.close().addListener(this::logCloseResult).syncUninterruptibly(); channel.close().addListener(this::logCloseResult).syncUninterruptibly();
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"close connection to remote address {} exception", "Close connection to remote address {} exception",
NetworkUtil.parseRemoteAddress(channel), NetworkUtil.parseRemoteAddress(channel),
e); e);
} }
...@@ -68,12 +68,12 @@ public class Connection { ...@@ -68,12 +68,12 @@ public class Connection {
if (future.isSuccess()) { if (future.isSuccess()) {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info( logger.info(
"close connection to remote address {} success", "Close connection to remote address {} success",
NetworkUtil.parseRemoteAddress(channel)); NetworkUtil.parseRemoteAddress(channel));
} }
} else { } else {
logger.warn( logger.warn(
"close connection to remote address {} fail", "Close connection to remote address {} fail",
NetworkUtil.parseRemoteAddress(channel), NetworkUtil.parseRemoteAddress(channel),
future.cause()); future.cause());
} }
......
...@@ -50,10 +50,7 @@ public class LogProxyClientTest { ...@@ -50,10 +50,7 @@ public class LogProxyClientTest {
@Override @Override
public void onException(LogProxyClientException e) { public void onException(LogProxyClientException e) {
if (e.needStop()) { logger.error(e.getMessage());
logger.error(e.getMessage());
client.stop();
}
} }
}); });
client.start(); client.start();
...@@ -83,10 +80,7 @@ public class LogProxyClientTest { ...@@ -83,10 +80,7 @@ public class LogProxyClientTest {
@Override @Override
public void onException(LogProxyClientException e) { public void onException(LogProxyClientException e) {
if (e.needStop()) { logger.error(e.getMessage());
logger.error(e.getMessage());
client.stop();
}
} }
}); });
client.start(); client.start();
......
...@@ -15,7 +15,7 @@ See the Mulan PSL v2 for more details. ...@@ -15,7 +15,7 @@ See the Mulan PSL v2 for more details.
<groupId>com.oceanbase.logclient</groupId> <groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId> <artifactId>logclient</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.0.5-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>${project.groupId}:${project.artifactId}</name> <name>${project.groupId}:${project.artifactId}</name>
......