diff --git a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java index ce17b2d1075cf010d3f4ce34b321856ad339084b..906be99ce1042f598efbf1842bf15558654c08a7 100644 --- a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java +++ b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java @@ -15,10 +15,9 @@ import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.config.ObReaderConfig; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; import com.oceanbase.clogproxy.client.listener.RecordListener; +import com.oceanbase.oms.logmessage.DataMessage; import com.oceanbase.oms.logmessage.LogMessage; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import javax.net.ssl.SSLException; +import java.util.stream.Collectors; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -31,42 +30,38 @@ public class LogProxyClientTest { @Test public void testLogProxyClient() { - ObReaderConfig config = new ObReaderConfig(); - config.setRsList("127.0.0.1:2882:2881"); - config.setUsername("root@sys"); - config.setPassword("root@sys"); - config.setStartTimestamp(0L); - config.setTableWhiteList("sys.test.*"); - - LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config); - - client.addListener( - new RecordListener() { + // Can get it with "show parameters like 'obconfig_url'" + String clusterUrl = + "http://127.0.0.1:8080/services" + + "?Action=ObRootServiceInfo" + + "&User_ID=alibaba" + + "&UID=ocpmaster" + + "&ObRegion=obcluster"; - @Override - public void notify(LogMessage message) { - logger.info("LogMessage received: {}", message.toString()); - } + // Can get it with "show parameters like 'rootservice_list'" + String rsList = "127.0.0.1:2882:2881"; - @Override - public void onException(LogProxyClientException e) { - logger.error(e.getMessage()); - } - }); - client.start(); - client.join(); - } - - @Test - public void testLogProxyClientWithSsl() throws SSLException { ObReaderConfig config = new ObReaderConfig(); - config.setRsList("127.0.0.1:2882:2881"); - config.setUsername("root@sys"); - config.setPassword("root@sys"); - config.setStartTimestamp(0L); - config.setTableWhiteList("sys.test.*"); - ClientConf clientConf = ClientConf.builder().sslContext(sslContext()).build(); + // Either 'rsList' or 'clusterUrl' should be not empty, will try to use 'clusterUrl' firstly + config.setRsList(rsList); + // config.setClusterUrl(clusterUrl); + config.setUsername("root@test_tenant"); + config.setPassword("pswd"); + config.setStartTimestamp(0L); + config.setTableWhiteList("test_tenant.test_db.*"); + config.setTableBlackList("test_tenant.test_db.test_table"); + config.setTimezone("+8:00"); + config.setWorkingMode("memory"); + + ClientConf clientConf = + ClientConf.builder() + .transferQueueSize(1000) + .connectTimeoutMs(3000) + .maxReconnectTimes(100) + .ignoreUnknownRecordType(true) + .clientId("test") + .build(); LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, clientConf); @@ -75,7 +70,48 @@ public class LogProxyClientTest { @Override public void notify(LogMessage message) { - logger.info("LogMessage received: {}", message.toString()); + switch (message.getOpt()) { + case INSERT: + case UPDATE: + case DELETE: + // note that the db name contains prefix '{tenant}.' + logger.info( + "Received log message of type {}: db: {}, table: {}, checkpoint {}", + message.getOpt(), + message.getDbName(), + message.getTableName(), + message.getCheckpoint()); + // old fields for type 'UPDATE', 'DELETE' + logger.info( + "Old field values: {}", + message.getFieldList().stream() + .filter(DataMessage.Record.Field::isPrev) + .collect(Collectors.toList())); + // new fields for type 'UPDATE', 'INSERT' + logger.info( + "New field values: {}", + message.getFieldList().stream() + .filter(field -> !field.isPrev()) + .collect(Collectors.toList())); + break; + case HEARTBEAT: + logger.info( + "Received heartbeat message with checkpoint {}", + message.getCheckpoint()); + break; + case BEGIN: + case COMMIT: + logger.info("Received transaction message {}", message.getOpt()); + break; + case DDL: + logger.info( + "Received log message with DDL: {}", + message.getFieldList().get(0).getValue().toString()); + break; + default: + throw new IllegalArgumentException( + "Unsupported log message type: " + message.getOpt()); + } } @Override @@ -86,14 +122,4 @@ public class LogProxyClientTest { client.start(); client.join(); } - - private SslContext sslContext() throws SSLException { - return SslContextBuilder.forClient() - .sslProvider(SslContext.defaultClientProvider()) - .trustManager(this.getClass().getClassLoader().getResourceAsStream("certs/ca.crt")) - .keyManager( - this.getClass().getClassLoader().getResourceAsStream("certs/client.crt"), - this.getClass().getClassLoader().getResourceAsStream("certs/client.key")) - .build(); - } } diff --git a/logproxy-client/src/test/resources/certs/ca.crt b/logproxy-client/src/test/resources/certs/ca.crt deleted file mode 100644 index 70b6ad2836dcdda47d93cd6d0da4acacbca9c8c9..0000000000000000000000000000000000000000 --- a/logproxy-client/src/test/resources/certs/ca.crt +++ /dev/null @@ -1,16 +0,0 @@ ------BEGIN CERTIFICATE----- -MIICeDCCAeGgAwIBAgIJAMK6EbZONqr5MA0GCSqGSIb3DQEBCwUAMFUxCzAJBgNV -BAYTAkNOMREwDwYDVQQIDAhaaGVKaWFuZzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5 -MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBhbnkgTHRkMB4XDTIxMDkxMzA1NTk1MloX -DTIxMTAxMzA1NTk1MlowVTELMAkGA1UEBhMCQ04xETAPBgNVBAgMCFpoZUppYW5n -MRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFu -eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPA7Wl5b5q3anZoGorUE -9Q/7WexpY3YrjawseYwzBIgtuQlOG/K8EZO2VztcsY1fkKQfOy56Y4awpWPVsM13 -GAl/qkT6Dw358O14fadQ5UVnama8UB52DPsXHkvoZQBT45nXckWYTyycr3WhzlH/ -fwygORnTJujhcrk69IkROAjpAgMBAAGjUDBOMB0GA1UdDgQWBBRa9yv01ZQEklvt -o07396rg0lemyTAfBgNVHSMEGDAWgBRa9yv01ZQEklvto07396rg0lemyTAMBgNV -HRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4GBAEHhtxKnQJ7DHcoloO61OI3TCxuf -9P/qYcLn+PkkAEHbOxG/+hLom/AgiI4licKGAtrNtxqfHc/05yFQYS6KMmtJWk3q -wVuCUVHoZbEdkubV0quBtr5MDpQC7IzRCFmPxNRt7G8EAYcoEsPoMFZCMEh4hj6i -KXXPa+iYoc2HHsX+ ------END CERTIFICATE----- diff --git a/logproxy-client/src/test/resources/certs/client.crt b/logproxy-client/src/test/resources/certs/client.crt deleted file mode 100644 index 89604d4ae5872bbff290064596692e178f708879..0000000000000000000000000000000000000000 --- a/logproxy-client/src/test/resources/certs/client.crt +++ /dev/null @@ -1,15 +0,0 @@ ------BEGIN CERTIFICATE----- -MIICZjCCAc8CCQDnFsEOdj0oqDANBgkqhkiG9w0BAQsFADBVMQswCQYDVQQGEwJD -TjERMA8GA1UECAwIWmhlSmlhbmcxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoG -A1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDAeFw0yMTA5MTMxMTAzMTNaFw0yMTEw -MTMxMTAzMTNaMIGZMQswCQYDVQQGEwJDTjERMA8GA1UECAwIWmhlSmlhbmcxETAP -BgNVBAcMCEhhbmdaaG91MQ8wDQYDVQQKDAZBbnRGaW4xCzAJBgNVBAsMAk9CMRow -GAYDVQQDDBFrMDhqMTUyNjQuZXU5NXNxYTEqMCgGCSqGSIb3DQEJARYbd2FuZ3l1 -bmxhaS53eWxAYW50Z3JvdXAuY29tMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB -gQD3AucmBGU87Rqr87L3cjr/Ca6vXiKLxOVXDspSEVtpwiACeMK7NOvP9X+iXEOc -LLZWLJFIWWtiu9M5AXVsZe1pJci7bqAXsa96iaNQ8FdcLqy7MLSPj71bdJ8yv3wW -1Q2tzqRrKx/Q7YlJQ2RtRqJLQwfPOAhXKvYBzf3+ocDn+QIDAQABMA0GCSqGSIb3 -DQEBCwUAA4GBAGRMRgU+sT5JFX58AYY0PO1YAiz94LJoLeBq9NttDicOgFGk2wlo -fAlbcDYZd9EBHJ4Cn/LCVLAR1icRGClQpObNz8fYdcsihWzFYy2cGDZ6m2ACSKeh -Agn4XgEfGbwjgCQuKORYBkDAfA0OU2lFcfzkrigktvwFgRMLsyseexb0 ------END CERTIFICATE----- diff --git a/logproxy-client/src/test/resources/certs/client.key b/logproxy-client/src/test/resources/certs/client.key deleted file mode 100644 index 9048518624ba24bb2afb42adf274eb2bc733f0ad..0000000000000000000000000000000000000000 --- a/logproxy-client/src/test/resources/certs/client.key +++ /dev/null @@ -1,16 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAPcC5yYEZTztGqvz -svdyOv8Jrq9eIovE5VcOylIRW2nCIAJ4wrs068/1f6JcQ5wstlYskUhZa2K70zkB -dWxl7WklyLtuoBexr3qJo1DwV1wurLswtI+PvVt0nzK/fBbVDa3OpGsrH9DtiUlD -ZG1GoktDB884CFcq9gHN/f6hwOf5AgMBAAECgYEAp9AFEcLytI2xDRkngQzOH+6I -CwQ9HA/Mb3TQ5yh7nkIQVR2NznmJq2LyL2/XTwbhaXIl0a1OU2mfep8PE3G789nU -0a9DWteUxleS00OPvfsxkcKwqFLb3SFnGaZpZvShJ3Ug9mwnSRY59qQiTnbJ4lx3 -edvYmWr+dX/MGoyuUzECQQD73AExkNBvKi0Ti+C/GjmIBLRKobgege26rTPtXnYt -SEbd+USR9q1dD6EF/CFbHp6cbv1ir8sRyrIICIv83l/DAkEA+xJ+jlrFe8o99US0 -5i0kbZNqHTvgjXk/4eSXr3Bel02ljYSnP1FDxus+UXT6G1mUdymtCgWm9ppHNGm3 -Z/S5kwJBAIZFXRGKrcSGDK/+A5x+I6vDLkcXfmwtQosiKavjj0dG4BkY+hiDFRum -6GajazkD0vV9KnMBW1ap5E3qGI+AEjcCQAPL6MwARWI00bEGw/GDFzzs8LrWb/PT -tIqW6VBG07dX/jvgmKLVeL/mSL/0k9+cACm5IJu5MCgkdxUs0BArXC8CQHjZIuVA -1KVhJpRiy9HtbQczoLjSm2SkD17Azv9H7MrlM4Db4qKNX9NUzsXkgKOm9R639aF6 -b/uC6OcdsA0jPTE= ------END PRIVATE KEY-----