未验证 提交 7d93664d 编写于 作者: H He Wang 提交者: GitHub

add obcdc configurations to connection config (#36)

* add cluster_url

* update doc

* add table blacklist and timezone

* update doc

* update doc about obcdc
上级 ba41dbf2
...@@ -59,13 +59,16 @@ When `LogProxyClient.start()` is executed, a new thread will be created in `Clie ...@@ -59,13 +59,16 @@ When `LogProxyClient.start()` is executed, a new thread will be created in `Clie
To connect to LogProxy, there are some parameters to set in `ObReaderConfig`: To connect to LogProxy, there are some parameters to set in `ObReaderConfig`:
- *rootserver_list*: Root server list of OceanBase cluster in format `ip1:rpc_port1:sql_port1;ip2:rpc_port2:sql_port2`, IP address here must be able to be resolved by LogProxy. - *cluster_url*: Cluster config url used to set up the OBConfig service. Required for OceanBase Enterprise Edition.
- *rootserver_list*: Root server list of OceanBase cluster in format `ip1:rpc_port1:sql_port1;ip2:rpc_port2:sql_port2`, IP address here must be able to be resolved by LogProxy. Required for OceanBase Community Edition.
- *cluster_username*: Username for OceanBase, the format is `username@tenant_name#cluster_name` when connecting to [obproxy](https://github.com/oceanbase/obproxy) or `username@tenant_name` when directly connecting to OceanBase server. - *cluster_username*: Username for OceanBase, the format is `username@tenant_name#cluster_name` when connecting to [obproxy](https://github.com/oceanbase/obproxy) or `username@tenant_name` when directly connecting to OceanBase server.
- *cluster_password*: Password for OceanBase when using configured `cluster_username`. - *cluster_password*: Password for OceanBase when using configured `cluster_username`.
- *first_start_timestamp*: Start timestamp in seconds, and zero means starting from now. - *first_start_timestamp*: Start timestamp in seconds, and zero means starting from now. Default is `0`.
- *tb_white_list*: Table whitelist in format `tenant_name.database_name.table_name`, `*` indicates any value, and multiple values can be separated by `|`. - *tb_white_list*: Table whitelist in format `tenant_name.database_name.table_name`, `*` indicates any value, and multiple values can be separated by `|`. Default is `*.*.*`.
- *tb_black_list*: Table blacklist in the same format with whitelist. Default is `|`.
- *timezone*: Timezone offset from UTC. Default value is `+8:00`.
These parameters are used in `liboblog`, you can check the [doc](https://github.com/oceanbase/oceanbase-doc/blob/V3.1.2/zh-CN/9.supporting-tools/4.cdc/2.liboblog/2.liboblog-parameters/2.liboblog-configuration-items.md) for more details. These parameters are used in `obcdc` (former `liboblog`), and the items not listed above can be passed to `obcdc` through the `ObReaderConfig` constructor with parameters.
Here is an example to set ObReaderConfig with a user of sys tenant, and the OceanBase and LogProxy server are on the same machine. Here is an example to set ObReaderConfig with a user of sys tenant, and the OceanBase and LogProxy server are on the same machine.
......
...@@ -18,6 +18,7 @@ import com.oceanbase.clogproxy.common.util.CryptoUtil; ...@@ -18,6 +18,7 @@ import com.oceanbase.clogproxy.common.util.CryptoUtil;
import com.oceanbase.clogproxy.common.util.Hex; import com.oceanbase.clogproxy.common.util.Hex;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -25,6 +26,9 @@ import org.slf4j.LoggerFactory; ...@@ -25,6 +26,9 @@ import org.slf4j.LoggerFactory;
public class ObReaderConfig extends AbstractConnectionConfig { public class ObReaderConfig extends AbstractConnectionConfig {
private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class); private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class);
/** Cluster config url. */
private static final ConfigItem<String> CLUSTER_URL = new ConfigItem<>("cluster_url", "");
/** Root server list. */ /** Root server list. */
private static final ConfigItem<String> RS_LIST = new ConfigItem<>("rootserver_list", ""); private static final ConfigItem<String> RS_LIST = new ConfigItem<>("rootserver_list", "");
...@@ -37,12 +41,19 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -37,12 +41,19 @@ public class ObReaderConfig extends AbstractConnectionConfig {
/** Table whitelist. */ /** Table whitelist. */
private static final ConfigItem<String> TABLE_WHITE_LIST = private static final ConfigItem<String> TABLE_WHITE_LIST =
new ConfigItem<>("tb_white_list", ""); new ConfigItem<>("tb_white_list", "*.*.*");
/** Table blacklist. */
private static final ConfigItem<String> TABLE_BLACK_LIST =
new ConfigItem<>("tb_black_list", "|");
/** Start timestamp. */ /** Start timestamp. */
private static final ConfigItem<Long> START_TIMESTAMP = private static final ConfigItem<Long> START_TIMESTAMP =
new ConfigItem<>("first_start_timestamp", 0L); new ConfigItem<>("first_start_timestamp", 0L);
/** Timezone offset. */
private static final ConfigItem<String> TIME_ZONE = new ConfigItem<>("timezone", "+8:00");
/** Constructor with empty arguments. */ /** Constructor with empty arguments. */
public ObReaderConfig() { public ObReaderConfig() {
super(new HashMap<>()); super(new HashMap<>());
...@@ -65,7 +76,9 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -65,7 +76,9 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override @Override
public boolean valid() { public boolean valid() {
try { try {
Validator.notEmpty(RS_LIST.val, "invalid rsList"); if (StringUtils.isEmpty(CLUSTER_URL.val) && StringUtils.isEmpty(RS_LIST.val)) {
throw new IllegalArgumentException("empty clusterUrl or rsList");
}
Validator.notEmpty(CLUSTER_USER.val, "invalid clusterUser"); Validator.notEmpty(CLUSTER_USER.val, "invalid clusterUser");
Validator.notEmpty(CLUSTER_PASSWORD.val, "invalid clusterPassword"); Validator.notEmpty(CLUSTER_PASSWORD.val, "invalid clusterPassword");
if (START_TIMESTAMP.val < 0L) { if (START_TIMESTAMP.val < 0L) {
...@@ -83,6 +96,11 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -83,6 +96,11 @@ public class ObReaderConfig extends AbstractConnectionConfig {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) { for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) {
String value = entry.getValue().val.toString(); String value = entry.getValue().val.toString();
// Empty `cluster_url` should be discarded, otherwise the server will
// use it as a valid value by mistake.
if (CLUSTER_URL.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) { if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value)); value = Hex.str(CryptoUtil.sha1(value));
} }
...@@ -106,15 +124,25 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -106,15 +124,25 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override @Override
public String toString() { public String toString() {
return "rootserver_list=" return (StringUtils.isNotEmpty(CLUSTER_URL.val))
+ RS_LIST ? ("cluster_url=" + CLUSTER_URL)
+ ", cluster_user=" : ("rootserver_list=" + RS_LIST)
+ CLUSTER_USER + ", cluster_user="
+ ", cluster_password=******, " + CLUSTER_USER
+ "tb_white_list=" + ", cluster_password=******, "
+ TABLE_WHITE_LIST + "tb_white_list="
+ ", start_timestamp=" + TABLE_WHITE_LIST
+ START_TIMESTAMP; + ", start_timestamp="
+ START_TIMESTAMP;
}
/**
* Set cluster config url.
*
* @param clusterUrl Cluster config url.
*/
public void setClusterUrl(String clusterUrl) {
CLUSTER_URL.set(clusterUrl);
} }
/** /**
...@@ -154,6 +182,15 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -154,6 +182,15 @@ public class ObReaderConfig extends AbstractConnectionConfig {
TABLE_WHITE_LIST.set(tableWhiteList); TABLE_WHITE_LIST.set(tableWhiteList);
} }
/**
* Set table blacklist, the format is same with table whitelist.
*
* @param tableBlackList Table blacklist.
*/
public void setTableBlackList(String tableBlackList) {
TABLE_BLACK_LIST.set(tableBlackList);
}
/** /**
* Set start timestamp, zero means from now on. * Set start timestamp, zero means from now on.
* *
...@@ -162,4 +199,13 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -162,4 +199,13 @@ public class ObReaderConfig extends AbstractConnectionConfig {
public void setStartTimestamp(Long startTimestamp) { public void setStartTimestamp(Long startTimestamp) {
START_TIMESTAMP.set(startTimestamp); START_TIMESTAMP.set(startTimestamp);
} }
/**
* Set the timezone which is used to convert timestamp column.
*
* @param timezone Timezone offset from UTC, the value is `+8:00` by default.
*/
public void setTimezone(String timezone) {
TIME_ZONE.set(timezone);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册