From 3c94deae37640c4d7a4bd36542c198ccde8eea30 Mon Sep 17 00:00:00 2001 From: He Wang Date: Fri, 17 Jun 2022 15:25:49 +0800 Subject: [PATCH] update config class for serialization (#45) * update config class for serialization * fix merged changes * fix initial order --- .../config/AbstractConnectionConfig.java | 12 +- .../clogproxy/client/config/ClientConf.java | 8 +- .../client/config/ConnectionConfig.java | 14 ++- .../client/config/ObReaderConfig.java | 103 +++++++++++------- .../client/config/ClientConfTest.java | 25 ++++- .../client/config/ObReaderConfigTest.java | 59 ++++++++++ 6 files changed, 171 insertions(+), 50 deletions(-) create mode 100644 logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ObReaderConfigTest.java diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java index 182159e..533e90b 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java @@ -13,6 +13,7 @@ package com.oceanbase.clogproxy.client.config; import com.oceanbase.clogproxy.common.packet.LogType; import com.oceanbase.clogproxy.common.util.TypeTrait; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -21,7 +22,7 @@ import java.util.Map.Entry; public abstract class AbstractConnectionConfig implements ConnectionConfig { /** Defined configurations map. */ - protected static Map> configs = new HashMap<>(); + protected final Map> configs = new HashMap<>(); /** Extra configurations map. */ protected final Map extraConfigs = new HashMap<>(); @@ -32,7 +33,10 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig { * @param The type of stored value. */ @SuppressWarnings("unchecked") - protected static class ConfigItem { + protected class ConfigItem implements Serializable { + + private static final long serialVersionUID = 1L; + protected String key; protected T val; @@ -73,11 +77,11 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig { } /** - * Sole constructor. + * Set configs. * * @param allConfigs The map of configurations. */ - public AbstractConnectionConfig(Map allConfigs) { + public void setConfigs(Map allConfigs) { if (allConfigs != null) { for (Entry entry : allConfigs.entrySet()) { if (!configs.containsKey(entry.getKey())) { diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java index cbbf842..8a3de20 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java @@ -14,11 +14,15 @@ package com.oceanbase.clogproxy.client.config; import com.oceanbase.clogproxy.client.util.ClientIdGenerator; import com.oceanbase.clogproxy.common.config.SharedConf; import io.netty.handler.ssl.SslContext; +import java.io.Serializable; /** The class that defines the constants that are used to generate the connection. */ -public class ClientConf extends SharedConf { +public class ClientConf extends SharedConf implements Serializable { + + private static final long serialVersionUID = 1L; + /** Client version. */ - public static final String VERSION = "1.0.1"; + public static final String VERSION = "1.0.5"; /** Queue size for storing records received from log proxy. */ private final int transferQueueSize; diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java index 199fd46..9714cbb 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java @@ -10,8 +10,12 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.config; + +import java.io.Serializable; +import java.util.Map; + /** This is the interface of connection config. */ -public interface ConnectionConfig { +public interface ConnectionConfig extends Serializable { /** * Generate a configuration string from connection parameters. @@ -20,6 +24,14 @@ public interface ConnectionConfig { */ String generateConfigurationString(); + /** + * Generate a configuration map from connection parameters. + * + * @param encryptPassword The flag of whether encrypt the password. + * @return The configuration map. + */ + Map generateConfigurationMap(boolean encryptPassword); + /** * Update the checkpoint. * diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java index b2ce514..971bece 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java @@ -24,44 +24,40 @@ import org.slf4j.LoggerFactory; /** This is a configuration class for connection to log proxy. */ public class ObReaderConfig extends AbstractConnectionConfig { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class); /** Cluster config url. */ - private static final ConfigItem CLUSTER_URL = new ConfigItem<>("cluster_url", ""); + private final ConfigItem clusterUrl = new ConfigItem<>("cluster_url", ""); /** Root server list. */ - private static final ConfigItem RS_LIST = new ConfigItem<>("rootserver_list", ""); + private final ConfigItem rsList = new ConfigItem<>("rootserver_list", ""); /** Cluster username. */ - private static final ConfigItem CLUSTER_USER = new ConfigItem<>("cluster_user", ""); + private final ConfigItem clusterUser = new ConfigItem<>("cluster_user", ""); /** Cluster password. */ - private static final ConfigItem CLUSTER_PASSWORD = - new ConfigItem<>("cluster_password", ""); + private final ConfigItem clusterPassword = new ConfigItem<>("cluster_password", ""); /** Table whitelist. */ - private static final ConfigItem TABLE_WHITE_LIST = - new ConfigItem<>("tb_white_list", "*.*.*"); + private final ConfigItem tableWhitelist = new ConfigItem<>("tb_white_list", "*.*.*"); /** Table blacklist. */ - private static final ConfigItem TABLE_BLACK_LIST = - new ConfigItem<>("tb_black_list", "|"); + private final ConfigItem tableBlacklist = new ConfigItem<>("tb_black_list", "|"); /** Start timestamp. */ - private static final ConfigItem START_TIMESTAMP = - new ConfigItem<>("first_start_timestamp", 0L); + private final ConfigItem startTimestamp = new ConfigItem<>("first_start_timestamp", 0L); /** Timezone offset. */ - private static final ConfigItem TIME_ZONE = new ConfigItem<>("timezone", "+8:00"); + private final ConfigItem timezone = new ConfigItem<>("timezone", "+8:00"); /** Working mode. */ - private static final ConfigItem WORKING_MODE = - new ConfigItem<>("working_mode", "storage"); + private final ConfigItem workingMode = new ConfigItem<>("working_mode", "storage"); /** Constructor with empty arguments. */ - public ObReaderConfig() { - super(new HashMap<>()); - } + public ObReaderConfig() {} /** * Constructor with a config map. @@ -69,7 +65,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param allConfigs Config map. */ public ObReaderConfig(Map allConfigs) { - super(allConfigs); + setConfigs(allConfigs); } @Override @@ -80,12 +76,12 @@ public class ObReaderConfig extends AbstractConnectionConfig { @Override public boolean valid() { try { - if (StringUtils.isEmpty(CLUSTER_URL.val) && StringUtils.isEmpty(RS_LIST.val)) { + if (StringUtils.isEmpty(clusterUrl.val) && StringUtils.isEmpty(rsList.val)) { throw new IllegalArgumentException("empty clusterUrl or rsList"); } - Validator.notEmpty(CLUSTER_USER.val, "invalid clusterUser"); - Validator.notEmpty(CLUSTER_PASSWORD.val, "invalid clusterPassword"); - if (START_TIMESTAMP.val < 0L) { + Validator.notEmpty(clusterUser.val, "invalid clusterUser"); + Validator.notEmpty(clusterPassword.val, "invalid clusterPassword"); + if (startTimestamp.val < 0L) { throw new IllegalArgumentException("invalid startTimestamp"); } return true; @@ -102,10 +98,10 @@ public class ObReaderConfig extends AbstractConnectionConfig { String value = entry.getValue().val.toString(); // Empty `cluster_url` should be discarded, otherwise the server will // use it as a valid value by mistake. - if (CLUSTER_URL.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) { + if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) { continue; } - if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) { + if (clusterPassword.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) { value = Hex.str(CryptoUtil.sha1(value)); } sb.append(entry.getKey()).append("=").append(value).append(" "); @@ -117,10 +113,31 @@ public class ObReaderConfig extends AbstractConnectionConfig { return sb.toString(); } + @Override + public Map generateConfigurationMap(boolean encrypt_password) { + Map result = new HashMap<>(); + for (Map.Entry> entry : configs.entrySet()) { + String value = entry.getValue().val.toString(); + // Empty `cluster_url` should be discarded, otherwise the server will + // use it as a valid value by mistake. + if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) { + continue; + } + if (encrypt_password + && clusterPassword.key.equals(entry.getKey()) + && SharedConf.AUTH_PASSWORD_HASH) { + value = Hex.str(CryptoUtil.sha1(value)); + } + result.put(entry.getKey(), value); + } + result.putAll(extraConfigs); + return result; + } + @Override public void updateCheckpoint(String checkpoint) { try { - START_TIMESTAMP.set(Long.parseLong(checkpoint)); + startTimestamp.set(Long.parseLong(checkpoint)); } catch (NumberFormatException e) { // do nothing } @@ -128,20 +145,22 @@ public class ObReaderConfig extends AbstractConnectionConfig { @Override public String toString() { - return (StringUtils.isNotEmpty(CLUSTER_URL.val)) - ? ("cluster_url=" + CLUSTER_URL) - : ("rootserver_list=" + RS_LIST) + return (StringUtils.isNotEmpty(clusterUrl.val)) + ? ("cluster_url=" + clusterUrl) + : ("rootserver_list=" + rsList) + ", cluster_user=" - + CLUSTER_USER + + clusterUser + ", cluster_password=******, " + "tb_white_list=" - + TABLE_WHITE_LIST + + tableWhitelist + ", tb_black_list=" - + TABLE_BLACK_LIST + + tableBlacklist + ", start_timestamp=" - + START_TIMESTAMP + + startTimestamp + ", timezone=" - + TIME_ZONE; + + timezone + + ", working_mode=" + + workingMode; } /** @@ -150,7 +169,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param clusterUrl Cluster config url. */ public void setClusterUrl(String clusterUrl) { - CLUSTER_URL.set(clusterUrl); + this.clusterUrl.set(clusterUrl); } /** @@ -159,7 +178,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param rsList Root server list. */ public void setRsList(String rsList) { - RS_LIST.set(rsList); + this.rsList.set(rsList); } /** @@ -168,7 +187,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param clusterUser Cluster username. */ public void setUsername(String clusterUser) { - CLUSTER_USER.set(clusterUser); + this.clusterUser.set(clusterUser); } /** @@ -177,7 +196,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param clusterPassword Cluster password. */ public void setPassword(String clusterPassword) { - CLUSTER_PASSWORD.set(clusterPassword); + this.clusterPassword.set(clusterPassword); } /** @@ -187,7 +206,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param tableWhiteList Table whitelist. */ public void setTableWhiteList(String tableWhiteList) { - TABLE_WHITE_LIST.set(tableWhiteList); + tableWhitelist.set(tableWhiteList); } /** @@ -196,7 +215,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param tableBlackList Table blacklist. */ public void setTableBlackList(String tableBlackList) { - TABLE_BLACK_LIST.set(tableBlackList); + tableBlacklist.set(tableBlackList); } /** @@ -205,7 +224,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param startTimestamp Start timestamp. */ public void setStartTimestamp(Long startTimestamp) { - START_TIMESTAMP.set(startTimestamp); + this.startTimestamp.set(startTimestamp); } /** @@ -214,7 +233,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param timezone Timezone offset from UTC, the value is `+8:00` by default. */ public void setTimezone(String timezone) { - TIME_ZONE.set(timezone); + this.timezone.set(timezone); } /** @@ -223,6 +242,6 @@ public class ObReaderConfig extends AbstractConnectionConfig { * @param workingMode Working mode, can be 'memory' or 'storage'. */ public void setWorkingMode(String workingMode) { - WORKING_MODE.set(workingMode); + this.workingMode.set(workingMode); } } diff --git a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java index 9d72c76..42b1ad6 100644 --- a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java +++ b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java @@ -13,13 +13,19 @@ package com.oceanbase.clogproxy.client.config; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.junit.Assert; import org.junit.Test; public class ClientConfTest { @Test - public void builderTest() { + public void testBuilderDefaultValues() { ClientConf clientConf = ClientConf.builder().build(); Assert.assertEquals(clientConf.getTransferQueueSize(), 20000); Assert.assertEquals(clientConf.getConnectTimeoutMs(), 5000); @@ -32,4 +38,21 @@ public class ClientConfTest { Assert.assertFalse(clientConf.isIgnoreUnknownRecordType()); Assert.assertNull(clientConf.getSslContext()); } + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + ClientConf clientConf = ClientConf.builder().build(); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream); + outputStream.writeObject(clientConf); + outputStream.flush(); + outputStream.close(); + ObjectInputStream inputStream = + new ObjectInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + Object object = inputStream.readObject(); + Assert.assertTrue(object instanceof ClientConf); + Assert.assertTrue(EqualsBuilder.reflectionEquals(object, clientConf)); + } } diff --git a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ObReaderConfigTest.java b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ObReaderConfigTest.java new file mode 100644 index 0000000..67ab61e --- /dev/null +++ b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ObReaderConfigTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. + * oblogclient is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package com.oceanbase.clogproxy.client.config; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +public class ObReaderConfigTest { + + private static ObReaderConfig generateTestConfig() { + ObReaderConfig config = new ObReaderConfig(); + config.setRsList("127.0.0.1:2882:2881"); + config.setUsername("root@test_tenant"); + config.setPassword("password"); + config.setStartTimestamp(0L); + config.setTableWhiteList("test_tenant.test.*"); + config.setTableBlackList("|"); + config.setTimezone("+8:00"); + config.setWorkingMode("storage"); + return config; + } + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + ObReaderConfig config = generateTestConfig(); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream); + outputStream.writeObject(config); + outputStream.flush(); + outputStream.close(); + ObjectInputStream inputStream = + new ObjectInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + Object object = inputStream.readObject(); + + Assert.assertTrue(object instanceof ObReaderConfig); + Map configMap = ((ObReaderConfig) object).generateConfigurationMap(false); + Assert.assertEquals(configMap.size(), 8); + Assert.assertEquals(configMap, config.generateConfigurationMap(false)); + } +} -- GitLab