未验证 提交 3c94deae 编写于 作者: H He Wang 提交者: GitHub

update config class for serialization (#45)

* update config class for serialization

* fix merged changes

* fix initial order
上级 21f71985
......@@ -13,6 +13,7 @@ package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.util.TypeTrait;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
......@@ -21,7 +22,7 @@ import java.util.Map.Entry;
public abstract class AbstractConnectionConfig implements ConnectionConfig {
/** Defined configurations map. */
protected static Map<String, ConfigItem<Object>> configs = new HashMap<>();
protected final Map<String, ConfigItem<Object>> configs = new HashMap<>();
/** Extra configurations map. */
protected final Map<String, String> extraConfigs = new HashMap<>();
......@@ -32,7 +33,10 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
* @param <T> The type of stored value.
*/
@SuppressWarnings("unchecked")
protected static class ConfigItem<T> {
protected class ConfigItem<T> implements Serializable {
private static final long serialVersionUID = 1L;
protected String key;
protected T val;
......@@ -73,11 +77,11 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
}
/**
* Sole constructor.
* Set configs.
*
* @param allConfigs The map of configurations.
*/
public AbstractConnectionConfig(Map<String, String> allConfigs) {
public void setConfigs(Map<String, String> allConfigs) {
if (allConfigs != null) {
for (Entry<String, String> entry : allConfigs.entrySet()) {
if (!configs.containsKey(entry.getKey())) {
......
......@@ -14,11 +14,15 @@ package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.common.config.SharedConf;
import io.netty.handler.ssl.SslContext;
import java.io.Serializable;
/** The class that defines the constants that are used to generate the connection. */
public class ClientConf extends SharedConf {
public class ClientConf extends SharedConf implements Serializable {
private static final long serialVersionUID = 1L;
/** Client version. */
public static final String VERSION = "1.0.1";
public static final String VERSION = "1.0.5";
/** Queue size for storing records received from log proxy. */
private final int transferQueueSize;
......
......@@ -10,8 +10,12 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import java.io.Serializable;
import java.util.Map;
/** This is the interface of connection config. */
public interface ConnectionConfig {
public interface ConnectionConfig extends Serializable {
/**
* Generate a configuration string from connection parameters.
......@@ -20,6 +24,14 @@ public interface ConnectionConfig {
*/
String generateConfigurationString();
/**
* Generate a configuration map from connection parameters.
*
* @param encryptPassword The flag of whether encrypt the password.
* @return The configuration map.
*/
Map<String, String> generateConfigurationMap(boolean encryptPassword);
/**
* Update the checkpoint.
*
......
......@@ -24,44 +24,40 @@ import org.slf4j.LoggerFactory;
/** This is a configuration class for connection to log proxy. */
public class ObReaderConfig extends AbstractConnectionConfig {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class);
/** Cluster config url. */
private static final ConfigItem<String> CLUSTER_URL = new ConfigItem<>("cluster_url", "");
private final ConfigItem<String> clusterUrl = new ConfigItem<>("cluster_url", "");
/** Root server list. */
private static final ConfigItem<String> RS_LIST = new ConfigItem<>("rootserver_list", "");
private final ConfigItem<String> rsList = new ConfigItem<>("rootserver_list", "");
/** Cluster username. */
private static final ConfigItem<String> CLUSTER_USER = new ConfigItem<>("cluster_user", "");
private final ConfigItem<String> clusterUser = new ConfigItem<>("cluster_user", "");
/** Cluster password. */
private static final ConfigItem<String> CLUSTER_PASSWORD =
new ConfigItem<>("cluster_password", "");
private final ConfigItem<String> clusterPassword = new ConfigItem<>("cluster_password", "");
/** Table whitelist. */
private static final ConfigItem<String> TABLE_WHITE_LIST =
new ConfigItem<>("tb_white_list", "*.*.*");
private final ConfigItem<String> tableWhitelist = new ConfigItem<>("tb_white_list", "*.*.*");
/** Table blacklist. */
private static final ConfigItem<String> TABLE_BLACK_LIST =
new ConfigItem<>("tb_black_list", "|");
private final ConfigItem<String> tableBlacklist = new ConfigItem<>("tb_black_list", "|");
/** Start timestamp. */
private static final ConfigItem<Long> START_TIMESTAMP =
new ConfigItem<>("first_start_timestamp", 0L);
private final ConfigItem<Long> startTimestamp = new ConfigItem<>("first_start_timestamp", 0L);
/** Timezone offset. */
private static final ConfigItem<String> TIME_ZONE = new ConfigItem<>("timezone", "+8:00");
private final ConfigItem<String> timezone = new ConfigItem<>("timezone", "+8:00");
/** Working mode. */
private static final ConfigItem<String> WORKING_MODE =
new ConfigItem<>("working_mode", "storage");
private final ConfigItem<String> workingMode = new ConfigItem<>("working_mode", "storage");
/** Constructor with empty arguments. */
public ObReaderConfig() {
super(new HashMap<>());
}
public ObReaderConfig() {}
/**
* Constructor with a config map.
......@@ -69,7 +65,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param allConfigs Config map.
*/
public ObReaderConfig(Map<String, String> allConfigs) {
super(allConfigs);
setConfigs(allConfigs);
}
@Override
......@@ -80,12 +76,12 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override
public boolean valid() {
try {
if (StringUtils.isEmpty(CLUSTER_URL.val) && StringUtils.isEmpty(RS_LIST.val)) {
if (StringUtils.isEmpty(clusterUrl.val) && StringUtils.isEmpty(rsList.val)) {
throw new IllegalArgumentException("empty clusterUrl or rsList");
}
Validator.notEmpty(CLUSTER_USER.val, "invalid clusterUser");
Validator.notEmpty(CLUSTER_PASSWORD.val, "invalid clusterPassword");
if (START_TIMESTAMP.val < 0L) {
Validator.notEmpty(clusterUser.val, "invalid clusterUser");
Validator.notEmpty(clusterPassword.val, "invalid clusterPassword");
if (startTimestamp.val < 0L) {
throw new IllegalArgumentException("invalid startTimestamp");
}
return true;
......@@ -102,10 +98,10 @@ public class ObReaderConfig extends AbstractConnectionConfig {
String value = entry.getValue().val.toString();
// Empty `cluster_url` should be discarded, otherwise the server will
// use it as a valid value by mistake.
if (CLUSTER_URL.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
if (clusterPassword.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
sb.append(entry.getKey()).append("=").append(value).append(" ");
......@@ -117,10 +113,31 @@ public class ObReaderConfig extends AbstractConnectionConfig {
return sb.toString();
}
@Override
public Map<String, String> generateConfigurationMap(boolean encrypt_password) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) {
String value = entry.getValue().val.toString();
// Empty `cluster_url` should be discarded, otherwise the server will
// use it as a valid value by mistake.
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (encrypt_password
&& clusterPassword.key.equals(entry.getKey())
&& SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
result.put(entry.getKey(), value);
}
result.putAll(extraConfigs);
return result;
}
@Override
public void updateCheckpoint(String checkpoint) {
try {
START_TIMESTAMP.set(Long.parseLong(checkpoint));
startTimestamp.set(Long.parseLong(checkpoint));
} catch (NumberFormatException e) {
// do nothing
}
......@@ -128,20 +145,22 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override
public String toString() {
return (StringUtils.isNotEmpty(CLUSTER_URL.val))
? ("cluster_url=" + CLUSTER_URL)
: ("rootserver_list=" + RS_LIST)
return (StringUtils.isNotEmpty(clusterUrl.val))
? ("cluster_url=" + clusterUrl)
: ("rootserver_list=" + rsList)
+ ", cluster_user="
+ CLUSTER_USER
+ clusterUser
+ ", cluster_password=******, "
+ "tb_white_list="
+ TABLE_WHITE_LIST
+ tableWhitelist
+ ", tb_black_list="
+ TABLE_BLACK_LIST
+ tableBlacklist
+ ", start_timestamp="
+ START_TIMESTAMP
+ startTimestamp
+ ", timezone="
+ TIME_ZONE;
+ timezone
+ ", working_mode="
+ workingMode;
}
/**
......@@ -150,7 +169,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param clusterUrl Cluster config url.
*/
public void setClusterUrl(String clusterUrl) {
CLUSTER_URL.set(clusterUrl);
this.clusterUrl.set(clusterUrl);
}
/**
......@@ -159,7 +178,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param rsList Root server list.
*/
public void setRsList(String rsList) {
RS_LIST.set(rsList);
this.rsList.set(rsList);
}
/**
......@@ -168,7 +187,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param clusterUser Cluster username.
*/
public void setUsername(String clusterUser) {
CLUSTER_USER.set(clusterUser);
this.clusterUser.set(clusterUser);
}
/**
......@@ -177,7 +196,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param clusterPassword Cluster password.
*/
public void setPassword(String clusterPassword) {
CLUSTER_PASSWORD.set(clusterPassword);
this.clusterPassword.set(clusterPassword);
}
/**
......@@ -187,7 +206,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param tableWhiteList Table whitelist.
*/
public void setTableWhiteList(String tableWhiteList) {
TABLE_WHITE_LIST.set(tableWhiteList);
tableWhitelist.set(tableWhiteList);
}
/**
......@@ -196,7 +215,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param tableBlackList Table blacklist.
*/
public void setTableBlackList(String tableBlackList) {
TABLE_BLACK_LIST.set(tableBlackList);
tableBlacklist.set(tableBlackList);
}
/**
......@@ -205,7 +224,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param startTimestamp Start timestamp.
*/
public void setStartTimestamp(Long startTimestamp) {
START_TIMESTAMP.set(startTimestamp);
this.startTimestamp.set(startTimestamp);
}
/**
......@@ -214,7 +233,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param timezone Timezone offset from UTC, the value is `+8:00` by default.
*/
public void setTimezone(String timezone) {
TIME_ZONE.set(timezone);
this.timezone.set(timezone);
}
/**
......@@ -223,6 +242,6 @@ public class ObReaderConfig extends AbstractConnectionConfig {
* @param workingMode Working mode, can be 'memory' or 'storage'.
*/
public void setWorkingMode(String workingMode) {
WORKING_MODE.set(workingMode);
this.workingMode.set(workingMode);
}
}
......@@ -13,13 +13,19 @@
package com.oceanbase.clogproxy.client.config;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.junit.Assert;
import org.junit.Test;
public class ClientConfTest {
@Test
public void builderTest() {
public void testBuilderDefaultValues() {
ClientConf clientConf = ClientConf.builder().build();
Assert.assertEquals(clientConf.getTransferQueueSize(), 20000);
Assert.assertEquals(clientConf.getConnectTimeoutMs(), 5000);
......@@ -32,4 +38,21 @@ public class ClientConfTest {
Assert.assertFalse(clientConf.isIgnoreUnknownRecordType());
Assert.assertNull(clientConf.getSslContext());
}
@Test
public void testSerialization() throws IOException, ClassNotFoundException {
ClientConf clientConf = ClientConf.builder().build();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(clientConf);
outputStream.flush();
outputStream.close();
ObjectInputStream inputStream =
new ObjectInputStream(
new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
Object object = inputStream.readObject();
Assert.assertTrue(object instanceof ClientConf);
Assert.assertTrue(EqualsBuilder.reflectionEquals(object, clientConf));
}
}
/*
* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
* oblogclient is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package com.oceanbase.clogproxy.client.config;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
public class ObReaderConfigTest {
private static ObReaderConfig generateTestConfig() {
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("root@test_tenant");
config.setPassword("password");
config.setStartTimestamp(0L);
config.setTableWhiteList("test_tenant.test.*");
config.setTableBlackList("|");
config.setTimezone("+8:00");
config.setWorkingMode("storage");
return config;
}
@Test
public void testSerialization() throws IOException, ClassNotFoundException {
ObReaderConfig config = generateTestConfig();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(config);
outputStream.flush();
outputStream.close();
ObjectInputStream inputStream =
new ObjectInputStream(
new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
Object object = inputStream.readObject();
Assert.assertTrue(object instanceof ObReaderConfig);
Map<String, String> configMap = ((ObReaderConfig) object).generateConfigurationMap(false);
Assert.assertEquals(configMap.size(), 8);
Assert.assertEquals(configMap, config.generateConfigurationMap(false));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册