...
 
Commits (8)
    https://gitcode.net/oceanbase/oblogclient/-/commit/40ea40e93f983663845872ba4ed21da786ab2272 update outdated tutorial & remove git lfs pic (#41) 2022-05-27T20:12:54+08:00 He Wang wanghechn@qq.com * update logproxy-client doc * uninstall git lfs https://gitcode.net/oceanbase/oblogclient/-/commit/973b51801fb6bfe277eb6ba4e9948d4823edd95b update version compatibility description (#42) 2022-05-28T11:07:31+08:00 He Wang wanghechn@qq.com update version desc https://gitcode.net/oceanbase/oblogclient/-/commit/21f71985e9c060e20c373d21fd72912ef9646428 add workingMode to common configs (#46) 2022-06-15T20:33:35+08:00 He Wang wanghechn@qq.com * add workingMode to common configs * modify doc https://gitcode.net/oceanbase/oblogclient/-/commit/3c94deae37640c4d7a4bd36542c198ccde8eea30 update config class for serialization (#45) 2022-06-17T15:25:49+08:00 He Wang wanghechn@qq.com * update config class for serialization * fix merged changes * fix initial order https://gitcode.net/oceanbase/oblogclient/-/commit/dbde4698a665d7caf1c88c80ca69aed8c6732878 [maven-release-plugin] prepare release logclient-1.0.5 2022-06-17T15:40:25+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/444e62ed9bedaaea71a78e04183491984637cd67 [maven-release-plugin] prepare for next development iteration 2022-06-17T15:40:30+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/1dc838eb3487c37c11986310e6cc827720fbaf7d use safeTimestamp (#48) 2022-06-21T16:32:50+08:00 He Wang wanghechn@qq.com https://gitcode.net/oceanbase/oblogclient/-/commit/33f737e7a0dec46fc0235695d3a37d6d95ab8f4a check value when update checkpoint (#49) 2022-06-23T14:07:50+08:00 He Wang wanghechn@qq.com
*.png filter=lfs diff=lfs merge=lfs -text
......@@ -6,12 +6,13 @@ OceanBase Log Client is a library for obtaining log of [OceanBase](https://githu
Getting Started
---------------
There are modules as following:
### Work with LogProxy
- `common`: some common utils
- `logproxy-client`: the client for [oblogproxy](https://github.com/oceanbase/oblogproxy)
You can use `logproxy-client` with [oblogproxy](https://github.com/oceanbase/oblogproxy) to get the commit log of OceanBase cluster, see the [tutorial](docs/quickstart/logproxy-client-tutorial.md) for more details.
You can try the `logproxy-client` following the [LogProxyClient Tutorial](docs/quickstart/logproxy-client-tutorial.md).
### Connect to OceanBase Directly
Coming soon.
Communication
---------------
......
......@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.5-SNAPSHOT</version>
<version>1.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -47,12 +47,6 @@ If you'd rather like the latest snapshots of the upcoming major version, use our
</repositories>
```
## Workflow
![image](../images/logproxy-client-workflow.png)
When `LogProxyClient.start()` is executed, a new thread will be created in `ClientStream`. The thread will initialize a netty channel which will receive log data from LogProxy and put the data as `TransferPacket` to a BlockingQueue. When the netty connection is established, the thread will poll the queue and pass the `LogMessage` in TransferPacket to `RecordListener.notify`.
## Usage
### Basic Usage
......@@ -67,21 +61,33 @@ To connect to LogProxy, there are some parameters to set in `ObReaderConfig`:
- *tb_white_list*: Table whitelist in format `tenant_name.database_name.table_name`, `*` indicates any value, and multiple values can be separated by `|`. Default is `*.*.*`.
- *tb_black_list*: Table blacklist in the same format with whitelist. Default is `|`.
- *timezone*: Timezone offset from UTC. Default value is `+8:00`.
- *working_mode*: Working mode. Can be `storage` (default value, supported from `obcdc` 3.1.3) or `memory`.
These parameters are used in `obcdc` (former `liboblog`), and the items not listed above can be passed to `obcdc` through the `ObReaderConfig` constructor with parameters.
Here is an example to set ObReaderConfig with a user of sys tenant, and the OceanBase and LogProxy server are on the same machine.
Here is an example to set `ObReaderConfig` with OceanBase Community Edition:
```java
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("user@sys");
config.setPassword("pswd");
config.setUsername("username");
config.setPassword("password");
config.setStartTimestamp(0L);
config.setTableWhiteList("sys.db1.tb1|sys.db2.*");
config.setTableWhiteList("tenant.*.*");
```
Once ObReaderConfig is set properly, you can use it to instance a LogProxyClient and listen to the log data.
If you want to work with OceanBase Enterprise Edition, you can set the `ObReaderConfig` with `cluster_url` like below:
```java
ObReaderConfig config = new ObReaderConfig();
config.setClusterUrl("http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=alibaba&UID=ocpmaster&ObRegion=tenant");
config.setUsername("username");
config.setPassword("password");
config.setStartTimestamp(0L);
config.setTableWhiteList("tenant.*.*");
```
Once ObReaderConfig is set properly, you can use it to instance a LogProxyClient and monitor the log data.
```java
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config);
......@@ -96,10 +102,7 @@ client.addListener(new RecordListener() {
@Override
public void onException(LogProxyClientException e) {
if (e.needStop()) {
// add error hander here
client.stop();
}
logger.error(e.getMessage());
}
});
......@@ -109,7 +112,25 @@ client.join();
The method `LogProxyClient.start()` will start a new thread which serving with a netty socket to receive data from LogProxy.
For details about `LogMessage`, see [LogMessage](../formats/logmessage.md).
There are also some configurations for the client in `ClientConf`, if you don't want to use its default values, you can customize a `ClientConf` and pass it to the corresponding constructor to create the client instance.
```java
ClientConf clientConf =
ClientConf.builder()
.clientId("myClientId")
.transferQueueSize(1024)
.connectTimeoutMs(1000)
.readWaitTimeMs(1000)
.retryIntervalS(1)
.maxReconnectTimes(10)
.idleTimeoutS(10)
.nettyDiscardAfterReads(1)
.ignoreUnknownRecordType(true)
.build();
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, clientConf);
```
The received log records are parsed to `LogMessage` in the client handler, you can see [LogMessage doc](../formats/logmessage.md) for more details.
### SSL Encryption
......@@ -123,7 +144,8 @@ SslContext sslContext = SslContextBuilder.forClient()
this.getClass().getClassLoader().getResourceAsStream("client.crt"),
this.getClass().getClassLoader().getResourceAsStream("client.key"))
.build();
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, sslContext);
ClientConf clientConf = ClientConf.builder().sslContext(sslContext).build();
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, clientConf);
```
Here you need provide following files:
......@@ -133,6 +155,22 @@ Here you need provide following files:
See [manual](https://github.com/oceanbase/oblogproxy/blob/master/docs/manual.md) of LogProxy for more details about SSL encryption.
## Version Compatibility
The communication protocol between the `logproxy-client` and `oblogproxy` is forward compatible, and the latest version of `logproxy-client` can work with any version of `oblogproxy`. But for legacy versions, there are some restrictions in functionality.
#### OceanBase Enterprise Edition
To monitor change data from OceanBase EE, you need to configure `cluster_url` to replace the `rootserver_list` parameter for `obcdc`, which is supported from `1.0.4` of the client.
#### Record Compression
The log proxy compresses the record data by default from `1.0.1`, and the client fixed the bug in decompression process with [#33](https://github.com/oceanbase/oblogclient/pull/33) from `1.0.4`. So if you want to work with log proxy `1.0.1` or later version, you should use `1.0.4` or later version of the client.
#### Reuse Client Id
The log proxy use `clientId` to identify a connection, and reuse it will make the log proxy reduce the use of hardware resources. In legacy versions of the client, there is a bug [#38](https://github.com/oceanbase/oblogclient/issues/38) which will cause connection close failure, and it's fixed in `1.0.4`. So you can only reuse a fixed `clientId` from `1.0.4` of the client.
## Heartbeat and Troubleshooting
Once the connection is established properly, LogProxy will start to fetch log messages from OceanBase and send them to LogProxyClient. When the connection is idle, LogProxy will send heartbeat messages to LogProxyClient.
......
......@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.5-SNAPSHOT</version>
<version>1.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -13,6 +13,7 @@ package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.util.TypeTrait;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
......@@ -21,7 +22,7 @@ import java.util.Map.Entry;
public abstract class AbstractConnectionConfig implements ConnectionConfig {
/** Defined configurations map. */
protected static Map<String, ConfigItem<Object>> configs = new HashMap<>();
protected final Map<String, ConfigItem<Object>> configs = new HashMap<>();
/** Extra configurations map. */
protected final Map<String, String> extraConfigs = new HashMap<>();
......@@ -32,7 +33,10 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
* @param <T> The type of stored value.
*/
@SuppressWarnings("unchecked")
protected static class ConfigItem<T> {
protected class ConfigItem<T> implements Serializable {
private static final long serialVersionUID = 1L;
protected String key;
protected T val;
......@@ -73,11 +77,11 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
}
/**
* Sole constructor.
* Set configs.
*
* @param allConfigs The map of configurations.
*/
public AbstractConnectionConfig(Map<String, String> allConfigs) {
public void setConfigs(Map<String, String> allConfigs) {
if (allConfigs != null) {
for (Entry<String, String> entry : allConfigs.entrySet()) {
if (!configs.containsKey(entry.getKey())) {
......
......@@ -14,11 +14,15 @@ package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.common.config.SharedConf;
import io.netty.handler.ssl.SslContext;
import java.io.Serializable;
/** The class that defines the constants that are used to generate the connection. */
public class ClientConf extends SharedConf {
public class ClientConf extends SharedConf implements Serializable {
private static final long serialVersionUID = 1L;
/** Client version. */
public static final String VERSION = "1.0.1";
public static final String VERSION = "1.0.5";
/** Queue size for storing records received from log proxy. */
private final int transferQueueSize;
......
......@@ -10,8 +10,12 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import java.io.Serializable;
import java.util.Map;
/** This is the interface of connection config. */
public interface ConnectionConfig {
public interface ConnectionConfig extends Serializable {
/**
* Generate a configuration string from connection parameters.
......@@ -20,6 +24,14 @@ public interface ConnectionConfig {
*/
String generateConfigurationString();
/**
* Generate a configuration map from connection parameters.
*
* @param encryptPassword The flag of whether encrypt the password.
* @return The configuration map.
*/
Map<String, String> generateConfigurationMap(boolean encryptPassword);
/**
* Update the checkpoint.
*
......
......@@ -24,40 +24,40 @@ import org.slf4j.LoggerFactory;
/** This is a configuration class for connection to log proxy. */
public class ObReaderConfig extends AbstractConnectionConfig {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class);
/** Cluster config url. */
private static final ConfigItem<String> CLUSTER_URL = new ConfigItem<>("cluster_url", "");
private final ConfigItem<String> clusterUrl = new ConfigItem<>("cluster_url", "");
/** Root server list. */
private static final ConfigItem<String> RS_LIST = new ConfigItem<>("rootserver_list", "");
private final ConfigItem<String> rsList = new ConfigItem<>("rootserver_list", "");
/** Cluster username. */
private static final ConfigItem<String> CLUSTER_USER = new ConfigItem<>("cluster_user", "");
private final ConfigItem<String> clusterUser = new ConfigItem<>("cluster_user", "");
/** Cluster password. */
private static final ConfigItem<String> CLUSTER_PASSWORD =
new ConfigItem<>("cluster_password", "");
private final ConfigItem<String> clusterPassword = new ConfigItem<>("cluster_password", "");
/** Table whitelist. */
private static final ConfigItem<String> TABLE_WHITE_LIST =
new ConfigItem<>("tb_white_list", "*.*.*");
private final ConfigItem<String> tableWhitelist = new ConfigItem<>("tb_white_list", "*.*.*");
/** Table blacklist. */
private static final ConfigItem<String> TABLE_BLACK_LIST =
new ConfigItem<>("tb_black_list", "|");
private final ConfigItem<String> tableBlacklist = new ConfigItem<>("tb_black_list", "|");
/** Start timestamp. */
private static final ConfigItem<Long> START_TIMESTAMP =
new ConfigItem<>("first_start_timestamp", 0L);
private final ConfigItem<Long> startTimestamp = new ConfigItem<>("first_start_timestamp", 0L);
/** Timezone offset. */
private static final ConfigItem<String> TIME_ZONE = new ConfigItem<>("timezone", "+8:00");
private final ConfigItem<String> timezone = new ConfigItem<>("timezone", "+8:00");
/** Working mode. */
private final ConfigItem<String> workingMode = new ConfigItem<>("working_mode", "storage");
/** Constructor with empty arguments. */
public ObReaderConfig() {
super(new HashMap<>());
}
public ObReaderConfig() {}
/**
* Constructor with a config map.
......@@ -65,7 +65,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param allConfigs Config map.
*/
public ObReaderConfig(Map<String, String> allConfigs) {
super(allConfigs);
setConfigs(allConfigs);
}
@Override
......@@ -76,12 +76,12 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override
public boolean valid() {
try {
if (StringUtils.isEmpty(CLUSTER_URL.val) && StringUtils.isEmpty(RS_LIST.val)) {
if (StringUtils.isEmpty(clusterUrl.val) && StringUtils.isEmpty(rsList.val)) {
throw new IllegalArgumentException("empty clusterUrl or rsList");
}
Validator.notEmpty(CLUSTER_USER.val, "invalid clusterUser");
Validator.notEmpty(CLUSTER_PASSWORD.val, "invalid clusterPassword");
if (START_TIMESTAMP.val < 0L) {
Validator.notEmpty(clusterUser.val, "invalid clusterUser");
Validator.notEmpty(clusterPassword.val, "invalid clusterPassword");
if (startTimestamp.val < 0L) {
throw new IllegalArgumentException("invalid startTimestamp");
}
return true;
......@@ -98,10 +98,10 @@ public class ObReaderConfig extends AbstractConnectionConfig {
String value = entry.getValue().val.toString();
// Empty `cluster_url` should be discarded, otherwise the server will
// use it as a valid value by mistake.
if (CLUSTER_URL.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
if (clusterPassword.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
sb.append(entry.getKey()).append("=").append(value).append(" ");
......@@ -113,10 +113,36 @@ public class ObReaderConfig extends AbstractConnectionConfig {
return sb.toString();
}
@Override
public Map<String, String> generateConfigurationMap(boolean encrypt_password) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) {
String value = entry.getValue().val.toString();
// Empty `cluster_url` should be discarded, otherwise the server will
// use it as a valid value by mistake.
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (encrypt_password
&& clusterPassword.key.equals(entry.getKey())
&& SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
result.put(entry.getKey(), value);
}
result.putAll(extraConfigs);
return result;
}
@Override
public void updateCheckpoint(String checkpoint) {
try {
START_TIMESTAMP.set(Long.parseLong(checkpoint));
long timestamp = Long.parseLong(checkpoint);
if (timestamp < 0) {
throw new IllegalArgumentException(
"update checkpoint with invalid value: " + checkpoint);
}
startTimestamp.set(timestamp);
} catch (NumberFormatException e) {
// do nothing
}
......@@ -124,20 +150,22 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override
public String toString() {
return (StringUtils.isNotEmpty(CLUSTER_URL.val))
? ("cluster_url=" + CLUSTER_URL)
: ("rootserver_list=" + RS_LIST)
return (StringUtils.isNotEmpty(clusterUrl.val))
? ("cluster_url=" + clusterUrl)
: ("rootserver_list=" + rsList)
+ ", cluster_user="
+ CLUSTER_USER
+ clusterUser
+ ", cluster_password=******, "
+ "tb_white_list="
+ TABLE_WHITE_LIST
+ tableWhitelist
+ ", tb_black_list="
+ TABLE_BLACK_LIST
+ tableBlacklist
+ ", start_timestamp="
+ START_TIMESTAMP
+ startTimestamp
+ ", timezone="
+ TIME_ZONE;
+ timezone
+ ", working_mode="
+ workingMode;
}
/**
......@@ -146,7 +174,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param clusterUrl Cluster config url.
*/
public void setClusterUrl(String clusterUrl) {
CLUSTER_URL.set(clusterUrl);
this.clusterUrl.set(clusterUrl);
}
/**
......@@ -155,7 +183,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param rsList Root server list.
*/
public void setRsList(String rsList) {
RS_LIST.set(rsList);
this.rsList.set(rsList);
}
/**
......@@ -164,7 +192,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param clusterUser Cluster username.
*/
public void setUsername(String clusterUser) {
CLUSTER_USER.set(clusterUser);
this.clusterUser.set(clusterUser);
}
/**
......@@ -173,7 +201,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param clusterPassword Cluster password.
*/
public void setPassword(String clusterPassword) {
CLUSTER_PASSWORD.set(clusterPassword);
this.clusterPassword.set(clusterPassword);
}
/**
......@@ -183,7 +211,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param tableWhiteList Table whitelist.
*/
public void setTableWhiteList(String tableWhiteList) {
TABLE_WHITE_LIST.set(tableWhiteList);
tableWhitelist.set(tableWhiteList);
}
/**
......@@ -192,7 +220,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param tableBlackList Table blacklist.
*/
public void setTableBlackList(String tableBlackList) {
TABLE_BLACK_LIST.set(tableBlackList);
tableBlacklist.set(tableBlackList);
}
/**
......@@ -201,7 +229,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param startTimestamp Start timestamp.
*/
public void setStartTimestamp(Long startTimestamp) {
START_TIMESTAMP.set(startTimestamp);
this.startTimestamp.set(startTimestamp);
}
/**
......@@ -210,6 +238,15 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param timezone Timezone offset from UTC, the value is `+8:00` by default.
*/
public void setTimezone(String timezone) {
TIME_ZONE.set(timezone);
this.timezone.set(timezone);
}
/**
* Set working mode.
*
* @param workingMode Working mode, can be 'memory' or 'storage'.
*/
public void setWorkingMode(String workingMode) {
this.workingMode.set(workingMode);
}
}
......@@ -331,13 +331,19 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
throw new LogProxyClientException(ErrorCode.E_PARSE, e);
}
if (logger.isTraceEnabled()) {
logger.trace("Log message: {}", logMessage);
}
while (true) {
try {
recordQueue.put(new StreamContext.TransferPacket(logMessage));
stream.setCheckpointString(logMessage.getTimestamp());
stream.setCheckpointString(logMessage.getSafeTimestamp());
break;
} catch (InterruptedException e) {
// do nothing
} catch (IllegalArgumentException e) {
logger.error("Failed to update checkpoint for log message: " + logMessage, e);
}
}
......
......@@ -296,7 +296,14 @@ public class ClientStream {
* @param checkpointString Checkpoint string.
*/
public void setCheckpointString(String checkpointString) {
this.checkpointString = checkpointString;
long timestamp = Long.parseLong(checkpointString);
if (timestamp < 0) {
throw new IllegalArgumentException(
"Update checkpoint with invalid value: " + timestamp);
}
if (this.checkpointString == null || Long.parseLong(this.checkpointString) < timestamp) {
this.checkpointString = checkpointString;
}
}
/**
......
......@@ -13,13 +13,19 @@
package com.oceanbase.clogproxy.client.config;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.junit.Assert;
import org.junit.Test;
public class ClientConfTest {
@Test
public void builderTest() {
public void testBuilderDefaultValues() {
ClientConf clientConf = ClientConf.builder().build();
Assert.assertEquals(clientConf.getTransferQueueSize(), 20000);
Assert.assertEquals(clientConf.getConnectTimeoutMs(), 5000);
......@@ -32,4 +38,21 @@ public class ClientConfTest {
Assert.assertFalse(clientConf.isIgnoreUnknownRecordType());
Assert.assertNull(clientConf.getSslContext());
}
@Test
public void testSerialization() throws IOException, ClassNotFoundException {
ClientConf clientConf = ClientConf.builder().build();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(clientConf);
outputStream.flush();
outputStream.close();
ObjectInputStream inputStream =
new ObjectInputStream(
new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
Object object = inputStream.readObject();
Assert.assertTrue(object instanceof ClientConf);
Assert.assertTrue(EqualsBuilder.reflectionEquals(object, clientConf));
}
}
/*
* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
* oblogclient is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package com.oceanbase.clogproxy.client.config;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
public class ObReaderConfigTest {
private static ObReaderConfig generateTestConfig() {
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("root@test_tenant");
config.setPassword("password");
config.setStartTimestamp(0L);
config.setTableWhiteList("test_tenant.test.*");
config.setTableBlackList("|");
config.setTimezone("+8:00");
config.setWorkingMode("storage");
return config;
}
@Test
public void testSerialization() throws IOException, ClassNotFoundException {
ObReaderConfig config = generateTestConfig();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(config);
outputStream.flush();
outputStream.close();
ObjectInputStream inputStream =
new ObjectInputStream(
new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
Object object = inputStream.readObject();
Assert.assertTrue(object instanceof ObReaderConfig);
Map<String, String> configMap = ((ObReaderConfig) object).generateConfigurationMap(false);
Assert.assertEquals(configMap.size(), 8);
Assert.assertEquals(configMap, config.generateConfigurationMap(false));
}
}
......@@ -15,7 +15,7 @@ See the Mulan PSL v2 for more details.
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.5-SNAPSHOT</version>
<version>1.0.6-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.groupId}:${project.artifactId}</name>
......