提交 505516a5 编写于 作者: Q qiulu3

add unit test

上级 1569bf53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.metadata;
import org.junit.Test;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class JdbcUriTest {
@Test
public void assertInitJdbcUri() {
JdbcUri jdbcUri = new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db");
assertThat(jdbcUri.getHostname(), is("127.0.0.1"));
assertThat(jdbcUri.getPort(), is(3306));
assertThat(jdbcUri.getHost(), is("127.0.0.1:3306"));
assertThat(jdbcUri.getDatabase(), is("test_db"));
assertThat(jdbcUri.getScheme(), is("mysql"));
}
@Test
public void assertGetParameters() {
Map<String, String> parameters = new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db?useSSL=true&maxReconnects=30").getParameters();
assertThat(parameters.size(), is(2));
assertThat(parameters.get("useSSL"), is("true"));
assertThat(parameters.get("maxReconnects"), is("30"));
}
}
......@@ -99,22 +99,24 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
}
private void handleEvent(final JdbcUri uri, final AbstractBinlogEvent event) {
if (event instanceof PlaceholderEvent || filter(uri.getDatabase(), (AbstractRowsEvent) event)) {
createPlaceholderRecord(event);
return;
}
if (event instanceof WriteRowsEvent) {
handleWriteRowsEvent(uri, (WriteRowsEvent) event);
handleWriteRowsEvent((WriteRowsEvent) event);
} else if (event instanceof UpdateRowsEvent) {
handleUpdateRowsEvent(uri, (UpdateRowsEvent) event);
handleUpdateRowsEvent((UpdateRowsEvent) event);
} else if (event instanceof DeleteRowsEvent) {
handleDeleteRowsEvent(uri, (DeleteRowsEvent) event);
} else if (event instanceof PlaceholderEvent) {
createPlaceholderRecord(event);
handleDeleteRowsEvent((DeleteRowsEvent) event);
}
}
private void handleWriteRowsEvent(final JdbcUri uri, final WriteRowsEvent event) {
if (filter(uri.getDatabase(), event.getSchemaName(), event.getTableName())) {
createPlaceholderRecord(event);
return;
}
private boolean filter(final String database, final AbstractRowsEvent event) {
return !event.getSchemaName().equals(database) || !dumperConfiguration.getTableNameMap().containsKey(event.getTableName());
}
private void handleWriteRowsEvent(final WriteRowsEvent event) {
TableMetaData tableMetaData = metaDataManager.getTableMetaData(event.getTableName());
for (Serializable[] each : event.getAfterRows()) {
DataRecord record = createDataRecord(event, each.length);
......@@ -126,11 +128,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
}
}
private void handleUpdateRowsEvent(final JdbcUri uri, final UpdateRowsEvent event) {
if (filter(uri.getDatabase(), event.getSchemaName(), event.getTableName())) {
createPlaceholderRecord(event);
return;
}
private void handleUpdateRowsEvent(final UpdateRowsEvent event) {
TableMetaData tableMetaData = metaDataManager.getTableMetaData(event.getTableName());
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
......@@ -146,11 +144,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
}
}
private void handleDeleteRowsEvent(final JdbcUri uri, final DeleteRowsEvent event) {
if (filter(uri.getDatabase(), event.getSchemaName(), event.getTableName())) {
createPlaceholderRecord(event);
return;
}
private void handleDeleteRowsEvent(final DeleteRowsEvent event) {
TableMetaData tableMetaData = metaDataManager.getTableMetaData(event.getTableName());
for (Serializable[] each : event.getBeforeRows()) {
DataRecord record = createDataRecord(event, each.length);
......@@ -181,8 +175,4 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
} catch (final InterruptedException ignored) {
}
}
private boolean filter(final String database, final String schemaName, final String tableName) {
return !schemaName.equals(database) || !dumperConfiguration.getTableNameMap().containsKey(tableName);
}
}
......@@ -61,7 +61,7 @@ public final class MySQLPositionManager extends BasePositionManager<BinlogPositi
binlogPosition.setServerId(getServerId(connection));
setPosition(binlogPosition);
} catch (final SQLException ex) {
throw new RuntimeException("markPosition error", ex);
throw new RuntimeException("init position failed.", ex);
}
}
......
......@@ -25,7 +25,7 @@ import lombok.Setter;
*/
@Getter
@Setter
public class AbstractRowsEvent extends AbstractBinlogEvent {
public abstract class AbstractRowsEvent extends AbstractBinlogEvent {
private String schemaName;
......
......@@ -21,7 +21,6 @@ import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.BitSet;
import java.util.List;
/**
......@@ -34,6 +33,4 @@ public final class UpdateRowsEvent extends AbstractRowsEvent {
private List<Serializable[]> beforeRows;
private List<Serializable[]> afterRows;
private BitSet changedBitmap;
}
......@@ -17,12 +17,153 @@
package org.apache.shardingsphere.scaling.mysql;
import lombok.SneakyThrows;
import org.apache.commons.collections4.map.HashedMap;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.WriteRowsEvent;
import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class MySQLBinlogDumperTest {
private static final String URL = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
private MySQLBinlogDumper mySQLBinlogDumper;
private MemoryChannel channel;
@Before
public void setUp() {
ScalingContext.getInstance().init(new ServerConfiguration());
DumperConfiguration dumperConfiguration = mockDumperConfiguration();
initTableData(dumperConfiguration);
channel = new MemoryChannel(records -> {
});
mySQLBinlogDumper = new MySQLBinlogDumper(dumperConfiguration, new BinlogPosition("binlog-000001", 4L));
mySQLBinlogDumper.setChannel(channel);
}
private DumperConfiguration mockDumperConfiguration() {
JDBCDataSourceConfiguration jdbcDataSourceConfiguration =
new JDBCDataSourceConfiguration(URL, "root", "root");
DumperConfiguration dumperConfiguration = new DumperConfiguration();
dumperConfiguration.setDataSourceConfiguration(jdbcDataSourceConfiguration);
Map<String, String> tableNameMap = new HashedMap<>(1);
tableNameMap.put("t_order", "t_order");
dumperConfiguration.setTableNameMap(tableNameMap);
return dumperConfiguration;
}
@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
DataSource dataSource = new DataSourceManager().getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
}
@Test
public void assertWriteRowsEvent() {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setAfterRows(rows);
invokeHandleEvent(new JdbcUri(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
assertThat(((DataRecord) records.get(0)).getType(), is(ScalingConstant.INSERT));
}
@Test
public void assertUpdateRowsEvent() {
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
List<Serializable[]> beforeRows = new ArrayList<>(1);
beforeRows.add(new String[]{"1", "order_old"});
List<Serializable[]> afterRows = new ArrayList<>(1);
afterRows.add(new String[]{"1", "order_new"});
rowsEvent.setBeforeRows(beforeRows);
rowsEvent.setAfterRows(afterRows);
invokeHandleEvent(new JdbcUri(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
assertThat(((DataRecord) records.get(0)).getType(), is(ScalingConstant.UPDATE));
}
@Test
public void assertDeleteRowsEvent() {
DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setBeforeRows(rows);
invokeHandleEvent(new JdbcUri(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
assertThat(((DataRecord) records.get(0)).getType(), is(ScalingConstant.DELETE));
}
@Test
public void assertPlaceholderEvent() {
invokeHandleEvent(new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db"), new PlaceholderEvent());
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof PlaceholderRecord);
}
@Test
public void assertStart() {
public void assertRowsEventFiltered() {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setSchemaName("unknown_schema");
invokeHandleEvent(new JdbcUri(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof PlaceholderRecord);
}
@SneakyThrows({NoSuchMethodException.class, ReflectiveOperationException.class})
private void invokeHandleEvent(final JdbcUri uri, final AbstractBinlogEvent event) {
Method handleEvent = MySQLBinlogDumper.class.getDeclaredMethod("handleEvent", JdbcUri.class, AbstractBinlogEvent.class);
handleEvent.setAccessible(true);
handleEvent.invoke(mySQLBinlogDumper, uri, event);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.mysql.spi;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import org.apache.shardingsphere.scaling.mysql.MySQLBinlogDumper;
import org.apache.shardingsphere.scaling.mysql.MySQLDataSourceChecker;
import org.apache.shardingsphere.scaling.mysql.MySQLImporter;
import org.apache.shardingsphere.scaling.mysql.MySQLJdbcDumper;
import org.apache.shardingsphere.scaling.mysql.MySQLPositionManager;
import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class ScalingEntryLoaderTest {
@Test
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType("MySQL");
assertTrue(scalingEntry instanceof MySQLScalingEntry);
assertThat(scalingEntry.getPositionManager(), equalTo(MySQLPositionManager.class));
assertThat(scalingEntry.getCheckerClass(), equalTo(MySQLDataSourceChecker.class));
assertThat(scalingEntry.getImporterClass(), equalTo(MySQLImporter.class));
assertThat(scalingEntry.getJdbcDumperClass(), equalTo(MySQLJdbcDumper.class));
assertThat(scalingEntry.getLogDumperClass(), equalTo(MySQLBinlogDumper.class));
}
}
......@@ -66,7 +66,7 @@ public final class PostgreSQLPositionManager extends BasePositionManager<WalPosi
createIfNotExists(connection);
setPosition(getWalPosition(connection));
} catch (final SQLException ex) {
throw new RuntimeException("markPosition error", ex);
throw new RuntimeException("init position failed.", ex);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql.spi;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import org.apache.shardingsphere.scaling.postgresql.PostgreSQLDataSourceChecker;
import org.apache.shardingsphere.scaling.postgresql.PostgreSQLImporter;
import org.apache.shardingsphere.scaling.postgresql.PostgreSQLJdbcDumper;
import org.apache.shardingsphere.scaling.postgresql.PostgreSQLPositionManager;
import org.apache.shardingsphere.scaling.postgresql.PostgreSQLScalingEntry;
import org.apache.shardingsphere.scaling.postgresql.PostgreSQLWalDumper;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class ScalingEntryLoaderTest {
@Test
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType("PostgreSQL");
assertTrue(scalingEntry instanceof PostgreSQLScalingEntry);
assertThat(scalingEntry.getPositionManager(), equalTo(PostgreSQLPositionManager.class));
assertThat(scalingEntry.getCheckerClass(), equalTo(PostgreSQLDataSourceChecker.class));
assertThat(scalingEntry.getImporterClass(), equalTo(PostgreSQLImporter.class));
assertThat(scalingEntry.getJdbcDumperClass(), equalTo(PostgreSQLJdbcDumper.class));
assertThat(scalingEntry.getLogDumperClass(), equalTo(PostgreSQLWalDumper.class));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册