未验证 提交 9196de78 编写于 作者: H He Wang 提交者: GitHub

update test with more detailed example (#52)

上级 426f63aa
......@@ -15,10 +15,9 @@ import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import javax.net.ssl.SSLException;
import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
......@@ -31,42 +30,38 @@ public class LogProxyClientTest {
@Test
public void testLogProxyClient() {
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("root@sys");
config.setPassword("root@sys");
config.setStartTimestamp(0L);
config.setTableWhiteList("sys.test.*");
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config);
client.addListener(
new RecordListener() {
// Can get it with "show parameters like 'obconfig_url'"
String clusterUrl =
"http://127.0.0.1:8080/services"
+ "?Action=ObRootServiceInfo"
+ "&User_ID=alibaba"
+ "&UID=ocpmaster"
+ "&ObRegion=obcluster";
@Override
public void notify(LogMessage message) {
logger.info("LogMessage received: {}", message.toString());
}
// Can get it with "show parameters like 'rootservice_list'"
String rsList = "127.0.0.1:2882:2881";
@Override
public void onException(LogProxyClientException e) {
logger.error(e.getMessage());
}
});
client.start();
client.join();
}
@Test
public void testLogProxyClientWithSsl() throws SSLException {
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("root@sys");
config.setPassword("root@sys");
config.setStartTimestamp(0L);
config.setTableWhiteList("sys.test.*");
ClientConf clientConf = ClientConf.builder().sslContext(sslContext()).build();
// Either 'rsList' or 'clusterUrl' should be not empty, will try to use 'clusterUrl' firstly
config.setRsList(rsList);
// config.setClusterUrl(clusterUrl);
config.setUsername("root@test_tenant");
config.setPassword("pswd");
config.setStartTimestamp(0L);
config.setTableWhiteList("test_tenant.test_db.*");
config.setTableBlackList("test_tenant.test_db.test_table");
config.setTimezone("+8:00");
config.setWorkingMode("memory");
ClientConf clientConf =
ClientConf.builder()
.transferQueueSize(1000)
.connectTimeoutMs(3000)
.maxReconnectTimes(100)
.ignoreUnknownRecordType(true)
.clientId("test")
.build();
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, clientConf);
......@@ -75,7 +70,48 @@ public class LogProxyClientTest {
@Override
public void notify(LogMessage message) {
logger.info("LogMessage received: {}", message.toString());
switch (message.getOpt()) {
case INSERT:
case UPDATE:
case DELETE:
// note that the db name contains prefix '{tenant}.'
logger.info(
"Received log message of type {}: db: {}, table: {}, checkpoint {}",
message.getOpt(),
message.getDbName(),
message.getTableName(),
message.getCheckpoint());
// old fields for type 'UPDATE', 'DELETE'
logger.info(
"Old field values: {}",
message.getFieldList().stream()
.filter(DataMessage.Record.Field::isPrev)
.collect(Collectors.toList()));
// new fields for type 'UPDATE', 'INSERT'
logger.info(
"New field values: {}",
message.getFieldList().stream()
.filter(field -> !field.isPrev())
.collect(Collectors.toList()));
break;
case HEARTBEAT:
logger.info(
"Received heartbeat message with checkpoint {}",
message.getCheckpoint());
break;
case BEGIN:
case COMMIT:
logger.info("Received transaction message {}", message.getOpt());
break;
case DDL:
logger.info(
"Received log message with DDL: {}",
message.getFieldList().get(0).getValue().toString());
break;
default:
throw new IllegalArgumentException(
"Unsupported log message type: " + message.getOpt());
}
}
@Override
......@@ -86,14 +122,4 @@ public class LogProxyClientTest {
client.start();
client.join();
}
private SslContext sslContext() throws SSLException {
return SslContextBuilder.forClient()
.sslProvider(SslContext.defaultClientProvider())
.trustManager(this.getClass().getClassLoader().getResourceAsStream("certs/ca.crt"))
.keyManager(
this.getClass().getClassLoader().getResourceAsStream("certs/client.crt"),
this.getClass().getClassLoader().getResourceAsStream("certs/client.key"))
.build();
}
}
-----BEGIN CERTIFICATE-----
MIICeDCCAeGgAwIBAgIJAMK6EbZONqr5MA0GCSqGSIb3DQEBCwUAMFUxCzAJBgNV
BAYTAkNOMREwDwYDVQQIDAhaaGVKaWFuZzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5
MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBhbnkgTHRkMB4XDTIxMDkxMzA1NTk1MloX
DTIxMTAxMzA1NTk1MlowVTELMAkGA1UEBhMCQ04xETAPBgNVBAgMCFpoZUppYW5n
MRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFu
eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPA7Wl5b5q3anZoGorUE
9Q/7WexpY3YrjawseYwzBIgtuQlOG/K8EZO2VztcsY1fkKQfOy56Y4awpWPVsM13
GAl/qkT6Dw358O14fadQ5UVnama8UB52DPsXHkvoZQBT45nXckWYTyycr3WhzlH/
fwygORnTJujhcrk69IkROAjpAgMBAAGjUDBOMB0GA1UdDgQWBBRa9yv01ZQEklvt
o07396rg0lemyTAfBgNVHSMEGDAWgBRa9yv01ZQEklvto07396rg0lemyTAMBgNV
HRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4GBAEHhtxKnQJ7DHcoloO61OI3TCxuf
9P/qYcLn+PkkAEHbOxG/+hLom/AgiI4licKGAtrNtxqfHc/05yFQYS6KMmtJWk3q
wVuCUVHoZbEdkubV0quBtr5MDpQC7IzRCFmPxNRt7G8EAYcoEsPoMFZCMEh4hj6i
KXXPa+iYoc2HHsX+
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIICZjCCAc8CCQDnFsEOdj0oqDANBgkqhkiG9w0BAQsFADBVMQswCQYDVQQGEwJD
TjERMA8GA1UECAwIWmhlSmlhbmcxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoG
A1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDAeFw0yMTA5MTMxMTAzMTNaFw0yMTEw
MTMxMTAzMTNaMIGZMQswCQYDVQQGEwJDTjERMA8GA1UECAwIWmhlSmlhbmcxETAP
BgNVBAcMCEhhbmdaaG91MQ8wDQYDVQQKDAZBbnRGaW4xCzAJBgNVBAsMAk9CMRow
GAYDVQQDDBFrMDhqMTUyNjQuZXU5NXNxYTEqMCgGCSqGSIb3DQEJARYbd2FuZ3l1
bmxhaS53eWxAYW50Z3JvdXAuY29tMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQD3AucmBGU87Rqr87L3cjr/Ca6vXiKLxOVXDspSEVtpwiACeMK7NOvP9X+iXEOc
LLZWLJFIWWtiu9M5AXVsZe1pJci7bqAXsa96iaNQ8FdcLqy7MLSPj71bdJ8yv3wW
1Q2tzqRrKx/Q7YlJQ2RtRqJLQwfPOAhXKvYBzf3+ocDn+QIDAQABMA0GCSqGSIb3
DQEBCwUAA4GBAGRMRgU+sT5JFX58AYY0PO1YAiz94LJoLeBq9NttDicOgFGk2wlo
fAlbcDYZd9EBHJ4Cn/LCVLAR1icRGClQpObNz8fYdcsihWzFYy2cGDZ6m2ACSKeh
Agn4XgEfGbwjgCQuKORYBkDAfA0OU2lFcfzkrigktvwFgRMLsyseexb0
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAPcC5yYEZTztGqvz
svdyOv8Jrq9eIovE5VcOylIRW2nCIAJ4wrs068/1f6JcQ5wstlYskUhZa2K70zkB
dWxl7WklyLtuoBexr3qJo1DwV1wurLswtI+PvVt0nzK/fBbVDa3OpGsrH9DtiUlD
ZG1GoktDB884CFcq9gHN/f6hwOf5AgMBAAECgYEAp9AFEcLytI2xDRkngQzOH+6I
CwQ9HA/Mb3TQ5yh7nkIQVR2NznmJq2LyL2/XTwbhaXIl0a1OU2mfep8PE3G789nU
0a9DWteUxleS00OPvfsxkcKwqFLb3SFnGaZpZvShJ3Ug9mwnSRY59qQiTnbJ4lx3
edvYmWr+dX/MGoyuUzECQQD73AExkNBvKi0Ti+C/GjmIBLRKobgege26rTPtXnYt
SEbd+USR9q1dD6EF/CFbHp6cbv1ir8sRyrIICIv83l/DAkEA+xJ+jlrFe8o99US0
5i0kbZNqHTvgjXk/4eSXr3Bel02ljYSnP1FDxus+UXT6G1mUdymtCgWm9ppHNGm3
Z/S5kwJBAIZFXRGKrcSGDK/+A5x+I6vDLkcXfmwtQosiKavjj0dG4BkY+hiDFRum
6GajazkD0vV9KnMBW1ap5E3qGI+AEjcCQAPL6MwARWI00bEGw/GDFzzs8LrWb/PT
tIqW6VBG07dX/jvgmKLVeL/mSL/0k9+cACm5IJu5MCgkdxUs0BArXC8CQHjZIuVA
1KVhJpRiy9HtbQczoLjSm2SkD17Azv9H7MrlM4Db4qKNX9NUzsXkgKOm9R639aF6
b/uC6OcdsA0jPTE=
-----END PRIVATE KEY-----
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册