...
 
Commits (5)
    https://gitcode.net/oceanbase/oblogclient/-/commit/1a92566f92171dcf06d4c03dc467912d82f2e69d update checkpoint for record queue (#40) 2022-05-19T12:13:28+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/aa5a07830cddb306a49aee66f02af579148f3313 format logger message (#39) 2022-05-19T18:29:11+08:00 He Wang wanghechn@qq.com * format logger message * revert logic change https://gitcode.net/oceanbase/oblogclient/-/commit/eb199f78ec74875e83fe584ce77e218686351df1 fix & optimize ClientStream process (#37) 2022-05-20T10:50:42+08:00 He Wang wanghechn@qq.com * optimize process thread * revert checkpoint update https://gitcode.net/oceanbase/oblogclient/-/commit/64e458601835bc034aff891d81d42d38491a78a1 [maven-release-plugin] prepare release logclient-1.0.4 2022-05-20T11:28:03+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/3f5306d36967dee1a1b31324a5d42d918f2d9482 [maven-release-plugin] prepare for next development iteration 2022-05-20T11:28:12+08:00 He Wang wanghechn@qq.com
......@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -941,7 +941,7 @@ public class LogMessage extends DataMessage.Record {
}
int j = keyStr.indexOf(')', i);
if (j == -1) {
log.error("parse key error");
log.error("Parse key error");
return null;
}
m = j;
......@@ -1077,7 +1077,7 @@ public class LogMessage extends DataMessage.Record {
}
return primaryValues;
} catch (Exception e) {
throw new RuntimeException(e);
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
......
......@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -132,8 +132,12 @@ public class ObReaderConfig extends AbstractConnectionConfig {
+ ", cluster_password=******, "
+ "tb_white_list="
+ TABLE_WHITE_LIST
+ ", tb_black_list="
+ TABLE_BLACK_LIST
+ ", start_timestamp="
+ START_TIMESTAMP;
+ START_TIMESTAMP
+ ", timezone="
+ TIME_ZONE;
}
/**
......
......@@ -226,7 +226,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes);
logger.debug("server status: {}", response.toString());
logger.debug("Server status: {}", response.toString());
state = HandshakeStateV1.PB_HEAD;
} else {
dataNotEnough = true;
......@@ -252,18 +252,18 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
*/
private void checkHeader(int version, int type, int length) {
if (ProtocolVersion.codeOf(version) == null) {
logger.error("unsupported protocol version: {}", version);
logger.error("Unsupported protocol version: {}", version);
throw new LogProxyClientException(
ErrorCode.E_PROTOCOL, "unsupported protocol version: " + version);
ErrorCode.E_PROTOCOL, "Unsupported protocol version: " + version);
}
if (HeaderType.codeOf(type) == null) {
logger.error("unsupported header type: {}", type);
logger.error("Unsupported header type: {}", type);
throw new LogProxyClientException(
ErrorCode.E_HEADER_TYPE, "unsupported header type: " + type);
ErrorCode.E_HEADER_TYPE, "Unsupported header type: " + type);
}
if (length <= 0) {
logger.error("data length equals 0");
throw new LogProxyClientException(ErrorCode.E_LEN, "data length equals 0");
logger.error("Data length equals 0");
throw new LogProxyClientException(ErrorCode.E_LEN, "Data length equals 0");
}
}
......@@ -334,6 +334,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
while (true) {
try {
recordQueue.put(new StreamContext.TransferPacket(logMessage));
stream.setCheckpointString(logMessage.getTimestamp());
break;
} catch (InterruptedException e) {
// do nothing
......@@ -452,8 +453,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
poolFlag = false;
logger.info(
"Connect broken of ClientId: {} with LogProxy: {}",
params.info(),
"Channel closed with ClientId: {}, LogProxy: {}",
params.getClientId(),
NetworkUtil.parseRemoteAddress(ctx.channel()));
ctx.channel().disconnect();
ctx.close();
......
......@@ -32,11 +32,12 @@ public class ClientStream {
/** Flag of whether the stream is started. */
private final AtomicBoolean started = new AtomicBoolean(false);
/** The process thread. */
private Thread thread = null;
/** Context of stream. */
private StreamContext context;
private final StreamContext context;
/** Checkpoint string used to resume writing into the queue. */
private String checkpointString;
......@@ -79,10 +80,10 @@ public class ClientStream {
this.context = new StreamContext(this, clientConf, connectionParams);
}
/** Close and wait the connection. */
/** Close the connection and wait the process thread. */
public void stop() {
if (!started.compareAndSet(true, false)) {
logger.info("stopping LogProxy Client....");
if (started.compareAndSet(true, false)) {
logger.info("Try to stop this client");
if (connection != null) {
connection.close();
......@@ -91,8 +92,8 @@ public class ClientStream {
join();
thread = null;
logger.info("Client stopped successfully");
}
logger.info("stopped LogProxy Client");
}
/** Call {@link Thread#join()} method of process thread. */
......@@ -101,8 +102,8 @@ public class ClientStream {
try {
thread.join();
} catch (InterruptedException e) {
logger.warn("ClientStream thread is interrupted: " + e.getMessage());
stop();
logger.warn("Waits for process thread failed : {}", e.getMessage());
triggerStop();
}
}
}
......@@ -140,11 +141,10 @@ public class ClientStream {
while (isRunning()) {
ReconnectState state = reconnect();
if (state == ReconnectState.EXIT) {
logger.error("read thread to exit");
triggerException(
new LogProxyClientException(
ErrorCode.E_MAX_RECONNECT,
"exceed max reconnect retry"));
"Exceed max retry times"));
break;
}
if (state == ReconnectState.RETRY) {
......@@ -157,8 +157,8 @@ public class ClientStream {
continue;
}
StreamContext.TransferPacket packet;
while (true) {
StreamContext.TransferPacket packet = null;
while (isRunning()) {
try {
packet =
context.recordQueue()
......@@ -193,33 +193,24 @@ public class ClientStream {
+ packet.getType());
}
} catch (LogProxyClientException e) {
triggerStop();
triggerException(e);
return;
break;
} catch (Exception e) {
// if exception occurred, we exit
triggerStop();
triggerException(
new LogProxyClientException(ErrorCode.E_USER, e));
return;
break;
}
}
started.set(false);
if (connection != null) {
connection.close();
}
thread = null;
// TODO... if exception occurred, run handler callback
logger.warn("!!! read thread exit !!!");
triggerStop();
logger.info("Client process thread exit");
});
thread.setDaemon(false);
thread.start();
}
// add a shutdown hook to trigger the stop the process
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
/**
......@@ -239,15 +230,14 @@ public class ClientStream {
private ReconnectState reconnect() {
// reconnect flag mark, tiny load for checking
if (reconnect.compareAndSet(true, false)) {
logger.warn("start to reconnect...");
logger.info("Try to reconnect");
try {
if (context.config().getMaxReconnectTimes() != -1
&& retryTimes >= context.config().getMaxReconnectTimes()) {
logger.error(
"failed to reconnect, exceed max reconnect retry time: {}",
"Failed to reconnect, exceed max reconnect retry time: {}",
context.config().getMaxReconnectTimes());
reconnect.set(true);
return ReconnectState.EXIT;
}
......@@ -261,32 +251,30 @@ public class ClientStream {
logger.warn("reconnect set checkpoint: {}", checkpointString);
context.params().updateCheckpoint(checkpointString);
}
connection = ConnectionFactory.instance().createConnection(context);
if (connection != null) {
logger.warn("reconnect SUCC");
logger.info("Reconnect successfully");
retryTimes = 0;
reconnect.compareAndSet(true, false);
return ReconnectState.SUCCESS;
}
logger.error(
"failed to reconnect, retry count: {}, max: {}",
"Failed to reconnect, retry count: {}, max: {}",
++retryTimes,
context.config().getMaxReconnectTimes());
// not success, retry next time
reconnect.set(true);
return ReconnectState.RETRY;
} catch (Exception e) {
logger.error(
"failed to reconnect, retry count: {}, max: {}, message: {}",
"Failed to reconnect, retry count: {}, max: {}, message: {}",
++retryTimes,
context.config().getMaxReconnectTimes(),
e);
// not success, retry next time
reconnect.set(true);
return ReconnectState.RETRY;
} finally {
reconnecting.set(false);
}
......@@ -302,6 +290,15 @@ public class ClientStream {
}
}
/**
* Set checkpoint string.
*
* @param checkpointString Checkpoint string.
*/
public void setCheckpointString(String checkpointString) {
this.checkpointString = checkpointString;
}
/**
* Add a {@link RecordListener} to {@link #listeners}.
*
......
......@@ -41,7 +41,7 @@ public class Connection {
/** Close this connection. */
public void close() {
if (!closed.compareAndSet(false, true)) {
logger.warn("connection already closed");
logger.warn("Connection already closed");
}
if (channel != null) {
if (channel.isActive()) {
......@@ -49,7 +49,7 @@ public class Connection {
channel.close().addListener(this::logCloseResult).syncUninterruptibly();
} catch (Exception e) {
logger.warn(
"close connection to remote address {} exception",
"Close connection to remote address {} exception",
NetworkUtil.parseRemoteAddress(channel),
e);
}
......@@ -68,12 +68,12 @@ public class Connection {
if (future.isSuccess()) {
if (logger.isInfoEnabled()) {
logger.info(
"close connection to remote address {} success",
"Close connection to remote address {} success",
NetworkUtil.parseRemoteAddress(channel));
}
} else {
logger.warn(
"close connection to remote address {} fail",
"Close connection to remote address {} fail",
NetworkUtil.parseRemoteAddress(channel),
future.cause());
}
......
......@@ -50,10 +50,7 @@ public class LogProxyClientTest {
@Override
public void onException(LogProxyClientException e) {
if (e.needStop()) {
logger.error(e.getMessage());
client.stop();
}
logger.error(e.getMessage());
}
});
client.start();
......@@ -83,10 +80,7 @@ public class LogProxyClientTest {
@Override
public void onException(LogProxyClientException e) {
if (e.needStop()) {
logger.error(e.getMessage());
client.stop();
}
logger.error(e.getMessage());
}
});
client.start();
......
......@@ -15,7 +15,7 @@ See the Mulan PSL v2 for more details.
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.5-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.groupId}:${project.artifactId}</name>
......