未验证 提交 bd75ccfb 编写于 作者: A avalon5666 提交者: GitHub

Refactor RdbmsConfiguration (#6883)

* Refactor RdbmsConfiguraton(#6687)

* Refactor SyncConfiguration

* Refactor InventoryDataTaskSplitter & SyncTaskFactory

* Refactor DumperConfiguration

* For checkstyle
上级 ca220d0b
...@@ -17,51 +17,25 @@ ...@@ -17,51 +17,25 @@
package org.apache.shardingsphere.scaling.core.config; package org.apache.shardingsphere.scaling.core.config;
import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager; import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* Relational database management system configuration. * Dumper configuration.
*/ */
@Setter @Setter
@Getter @Getter
@EqualsAndHashCode public class DumperConfiguration {
public final class RdbmsConfiguration implements Cloneable {
private String dataSourceName; private String dataSourceName;
private DataSourceConfiguration dataSourceConfiguration; private DataSourceConfiguration dataSourceConfiguration;
private String tableName;
private Map<String, Set<String>> shardingColumnsMap;
private String primaryKey;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private PositionManager positionManager; private PositionManager positionManager;
private Integer spiltNum;
private Map<String, String> tableNameMap; private Map<String, String> tableNameMap;
private int retryTimes;
/**
* Clone to new rdbms configuration.
*
* @param origin origin rdbms configuration
* @return new rdbms configuration
*/
@SneakyThrows
public static RdbmsConfiguration clone(final RdbmsConfiguration origin) {
return (RdbmsConfiguration) origin.clone();
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
import java.util.Set;
/**
* Importer configuration.
*/
@Setter
@Getter
public final class ImporterConfiguration {
private DataSourceConfiguration dataSourceConfiguration;
private Map<String, Set<String>> shardingColumnsMap;
private int retryTimes;
}
...@@ -17,32 +17,25 @@ ...@@ -17,32 +17,25 @@
package org.apache.shardingsphere.scaling.core.config; package org.apache.shardingsphere.scaling.core.config;
import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil; import lombok.Getter;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager; import lombok.Setter;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is; /**
import static org.junit.Assert.assertNotSame; * Inventory dumper configuration.
import static org.junit.Assert.assertThat; */
@Getter
public final class RdbmsConfigurationTest { @Setter
public final class InventoryDumperConfiguration extends DumperConfiguration {
@Test private String tableName;
public void assertClone() {
RdbmsConfiguration origin = new RdbmsConfiguration(); private String primaryKey;
RdbmsConfiguration clone = RdbmsConfiguration.clone(origin);
assertThat(clone, is(origin)); private Integer spiltNum;
origin.setTableName("t1");
assertNotSame(origin, clone);
}
@Test public InventoryDumperConfiguration(final DumperConfiguration dumperConfiguration) {
public void assertGetWhereCondition() { setDataSourceName(dumperConfiguration.getDataSourceName());
RdbmsConfiguration rdbmsConfiguration = new RdbmsConfiguration(); setDataSourceConfiguration(dumperConfiguration.getDataSourceConfiguration());
assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("")); setTableNameMap(dumperConfiguration.getTableNameMap());
rdbmsConfiguration.setPrimaryKey("id");
rdbmsConfiguration.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(0, 10)));
assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("WHERE id BETWEEN 0 AND 10"));
} }
} }
...@@ -20,8 +20,6 @@ package org.apache.shardingsphere.scaling.core.config; ...@@ -20,8 +20,6 @@ package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.util.Map;
/** /**
* Sync configuration. * Sync configuration.
*/ */
...@@ -34,9 +32,7 @@ public final class SyncConfiguration { ...@@ -34,9 +32,7 @@ public final class SyncConfiguration {
*/ */
private final int concurrency; private final int concurrency;
private final Map<String, String> tableNameMap; private final DumperConfiguration dumperConfiguration;
private final RdbmsConfiguration dumperConfiguration;
private final RdbmsConfiguration importerConfiguration; private final ImporterConfiguration importerConfiguration;
} }
...@@ -21,8 +21,8 @@ import lombok.AccessLevel; ...@@ -21,8 +21,8 @@ import lombok.AccessLevel;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant; import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException; import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
...@@ -54,7 +54,7 @@ import java.sql.SQLException; ...@@ -54,7 +54,7 @@ import java.sql.SQLException;
public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor<InventoryPosition> implements JDBCDumper { public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor<InventoryPosition> implements JDBCDumper {
@Getter(AccessLevel.PROTECTED) @Getter(AccessLevel.PROTECTED)
private final RdbmsConfiguration rdbmsConfiguration; private final InventoryDumperConfiguration inventoryDumperConfiguration;
private final DataSourceManager dataSourceManager; private final DataSourceManager dataSourceManager;
...@@ -63,18 +63,18 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor ...@@ -63,18 +63,18 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
@Setter @Setter
private Channel channel; private Channel channel;
protected AbstractJDBCDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { protected AbstractJDBCDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) { if (!JDBCDataSourceConfiguration.class.equals(inventoryDumperConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("AbstractJDBCDumper only support JDBCDataSourceConfiguration"); throw new UnsupportedOperationException("AbstractJDBCDumper only support JDBCDataSourceConfiguration");
} }
this.rdbmsConfiguration = rdbmsConfiguration; this.inventoryDumperConfiguration = inventoryDumperConfiguration;
this.dataSourceManager = dataSourceManager; this.dataSourceManager = dataSourceManager;
tableMetaData = createTableMetaData(); tableMetaData = createTableMetaData();
} }
private TableMetaData createTableMetaData() { private TableMetaData createTableMetaData() {
MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration())); MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(inventoryDumperConfiguration.getDataSourceConfiguration()));
return metaDataManager.getTableMetaData(rdbmsConfiguration.getTableName()); return metaDataManager.getTableMetaData(inventoryDumperConfiguration.getTableName());
} }
@Override @Override
...@@ -84,15 +84,15 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor ...@@ -84,15 +84,15 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
} }
private void dump() { private void dump() {
try (Connection conn = dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()).getConnection()) { try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfiguration.getDataSourceConfiguration()).getConnection()) {
String sql = String.format("SELECT * FROM %s %s", rdbmsConfiguration.getTableName(), RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration)); String sql = String.format("SELECT * FROM %s %s", inventoryDumperConfiguration.getTableName(), RdbmsConfigurationUtil.getWhereCondition(inventoryDumperConfiguration));
PreparedStatement ps = createPreparedStatement(conn, sql); PreparedStatement ps = createPreparedStatement(conn, sql);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
while (isRunning() && rs.next()) { while (isRunning() && rs.next()) {
DataRecord record = new DataRecord(newInventoryPosition(rs), metaData.getColumnCount()); DataRecord record = new DataRecord(newInventoryPosition(rs), metaData.getColumnCount());
record.setType(ScalingConstant.INSERT); record.setType(ScalingConstant.INSERT);
record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName())); record.setTableName(inventoryDumperConfiguration.getTableNameMap().get(inventoryDumperConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) { for (int i = 1; i <= metaData.getColumnCount(); i++) {
record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i))); record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i)));
} }
...@@ -109,10 +109,10 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor ...@@ -109,10 +109,10 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
} }
private InventoryPosition newInventoryPosition(final ResultSet rs) throws SQLException { private InventoryPosition newInventoryPosition(final ResultSet rs) throws SQLException {
if (null == rdbmsConfiguration.getPrimaryKey()) { if (null == inventoryDumperConfiguration.getPrimaryKey()) {
return new PlaceholderPosition(); return new PlaceholderPosition();
} }
return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getPosition()).getEndValue()); return new PrimaryKeyPosition(rs.getLong(inventoryDumperConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) inventoryDumperConfiguration.getPositionManager().getPosition()).getEndValue());
} }
protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException; protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
......
...@@ -20,11 +20,12 @@ package org.apache.shardingsphere.scaling.core.execute.executor.dumper; ...@@ -20,11 +20,12 @@ package org.apache.shardingsphere.scaling.core.execute.executor.dumper;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.position.Position; import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry; import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader; import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
/** /**
* Dumper factory. * Dumper factory.
...@@ -35,52 +36,52 @@ public final class DumperFactory { ...@@ -35,52 +36,52 @@ public final class DumperFactory {
/** /**
* New instance of JDBC dumper. * New instance of JDBC dumper.
* *
* @param rdbmsConfiguration rdbms configuration * @param inventoryDumperConfiguration inventory dumper configuration
* @param dataSourceManager data source factory * @param dataSourceManager data source factory
* @return JDBC dumper * @return JDBC dumper
*/ */
@SneakyThrows @SneakyThrows
public static JDBCDumper newInstanceJdbcDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public static JDBCDumper newInstanceJdbcDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
return newInstanceJdbcDumper(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, dataSourceManager); return newInstanceJdbcDumper(inventoryDumperConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), inventoryDumperConfiguration, dataSourceManager);
} }
/** /**
* New instance of JDBC dumper. * New instance of JDBC dumper.
* *
* @param databaseType database type * @param databaseType database type
* @param rdbmsConfiguration rdbms configuration * @param inventoryDumperConfiguration inventory dumper configuration
* @param dataSourceManager data source factory * @param dataSourceManager data source factory
* @return JDBC dumper * @return JDBC dumper
*/ */
@SneakyThrows @SneakyThrows
public static JDBCDumper newInstanceJdbcDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public static JDBCDumper newInstanceJdbcDumper(final String databaseType, final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType); ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
return scalingEntry.getJdbcDumperClass().getConstructor(RdbmsConfiguration.class, DataSourceManager.class).newInstance(rdbmsConfiguration, dataSourceManager); return scalingEntry.getJdbcDumperClass().getConstructor(InventoryDumperConfiguration.class, DataSourceManager.class).newInstance(inventoryDumperConfiguration, dataSourceManager);
} }
/** /**
* New instance of log dumper. * New instance of log dumper.
* *
* @param rdbmsConfiguration rdbms configuration * @param dumperConfiguration rdbms configuration
* @param position position * @param position position
* @return log dumper * @return log dumper
*/ */
@SneakyThrows @SneakyThrows
public static LogDumper newInstanceLogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) { public static LogDumper newInstanceLogDumper(final DumperConfiguration dumperConfiguration, final Position position) {
return newInstanceLogDumper(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, position); return newInstanceLogDumper(dumperConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), dumperConfiguration, position);
} }
/** /**
* New instance of log dumper. * New instance of log dumper.
* *
* @param databaseType database type * @param databaseType database type
* @param rdbmsConfiguration rdbms configuration * @param dumperConfiguration rdbms configuration
* @param position position * @param position position
* @return log dumper * @return log dumper
*/ */
@SneakyThrows @SneakyThrows
public static LogDumper newInstanceLogDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final Position position) { public static LogDumper newInstanceLogDumper(final String databaseType, final DumperConfiguration dumperConfiguration, final Position position) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType); ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
return scalingEntry.getLogDumperClass().getConstructor(RdbmsConfiguration.class, Position.class).newInstance(rdbmsConfiguration, position); return scalingEntry.getLogDumperClass().getConstructor(DumperConfiguration.class, Position.class).newInstance(dumperConfiguration, position);
} }
} }
...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer; ...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant; import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException; import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
...@@ -47,7 +47,7 @@ import java.util.List; ...@@ -47,7 +47,7 @@ import java.util.List;
@Slf4j @Slf4j
public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer { public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
private final RdbmsConfiguration rdbmsConfiguration; private final ImporterConfiguration importerConfiguration;
private final DataSourceManager dataSourceManager; private final DataSourceManager dataSourceManager;
...@@ -56,8 +56,8 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut ...@@ -56,8 +56,8 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
@Setter @Setter
private Channel channel; private Channel channel;
protected AbstractJDBCImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { protected AbstractJDBCImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
this.rdbmsConfiguration = rdbmsConfiguration; this.importerConfiguration = importerConfiguration;
this.dataSourceManager = dataSourceManager; this.dataSourceManager = dataSourceManager;
sqlBuilder = createSqlBuilder(); sqlBuilder = createSqlBuilder();
} }
...@@ -80,7 +80,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut ...@@ -80,7 +80,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
while (isRunning()) { while (isRunning()) {
List<Record> records = channel.fetchRecords(100, 3); List<Record> records = channel.fetchRecords(100, 3);
if (null != records && !records.isEmpty()) { if (null != records && !records.isEmpty()) {
flush(dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()), records); flush(dataSourceManager.getDataSource(importerConfiguration.getDataSourceConfiguration()), records);
if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) { if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
channel.ack(); channel.ack();
break; break;
...@@ -98,7 +98,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut ...@@ -98,7 +98,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
} }
private boolean tryFlush(final DataSource dataSource, final List<Record> buffer) { private boolean tryFlush(final DataSource dataSource, final List<Record> buffer) {
int retryTimes = rdbmsConfiguration.getRetryTimes(); int retryTimes = importerConfiguration.getRetryTimes();
List<Record> unflushed = buffer; List<Record> unflushed = buffer;
do { do {
unflushed = doFlush(dataSource, unflushed); unflushed = doFlush(dataSource, unflushed);
...@@ -152,7 +152,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut ...@@ -152,7 +152,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
} }
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException { private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName())); List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfiguration.getShardingColumnsMap().get(record.getTableName()));
List<Column> values = new ArrayList<>(); List<Column> values = new ArrayList<>();
values.addAll(RecordUtil.extractUpdatedColumns(record)); values.addAll(RecordUtil.extractUpdatedColumns(record));
values.addAll(conditionColumns); values.addAll(conditionColumns);
...@@ -165,7 +165,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut ...@@ -165,7 +165,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
} }
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException { private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName())); List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfiguration.getShardingColumnsMap().get(record.getTableName()));
String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns); String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSql); PreparedStatement ps = connection.prepareStatement(deleteSql);
for (int i = 0; i < conditionColumns.size(); i++) { for (int i = 0; i < conditionColumns.size(); i++) {
......
...@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer; ...@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry; import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader; import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
...@@ -34,25 +34,25 @@ public final class ImporterFactory { ...@@ -34,25 +34,25 @@ public final class ImporterFactory {
/** /**
* New instance of importer. * New instance of importer.
* *
* @param rdbmsConfiguration rdbms configuration * @param importerConfiguration rdbms configuration
* @param dataSourceManager data source factory * @param dataSourceManager data source factory
* @return importer * @return importer
*/ */
public static Importer newInstance(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public static Importer newInstance(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
return newInstance(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, dataSourceManager); return newInstance(importerConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), importerConfiguration, dataSourceManager);
} }
/** /**
* New instance of importer. * New instance of importer.
* *
* @param databaseType database type * @param databaseType database type
* @param rdbmsConfiguration rdbms configuration * @param importerConfiguration rdbms configuration
* @param dataSourceManager data source factory * @param dataSourceManager data source factory
* @return importer * @return importer
*/ */
@SneakyThrows @SneakyThrows
public static Importer newInstance(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public static Importer newInstance(final String databaseType, final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType); ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
return scalingEntry.getImporterClass().getConstructor(RdbmsConfiguration.class, DataSourceManager.class).newInstance(rdbmsConfiguration, dataSourceManager); return scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, DataSourceManager.class).newInstance(importerConfiguration, dataSourceManager);
} }
} }
...@@ -104,7 +104,7 @@ public final class ShardingScalingJobPreparer { ...@@ -104,7 +104,7 @@ public final class ShardingScalingJobPreparer {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) { for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
DataSourceConfiguration dataSourceConfiguration = each.getDumperConfiguration().getDataSourceConfiguration(); DataSourceConfiguration dataSourceConfiguration = each.getDumperConfiguration().getDataSourceConfiguration();
each.getDumperConfiguration().setPositionManager(instancePositionManager(databaseType, dataSourceManager.getDataSource(dataSourceConfiguration))); each.getDumperConfiguration().setPositionManager(instancePositionManager(databaseType, dataSourceManager.getDataSource(dataSourceConfiguration)));
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each)); shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(), each.getDumperConfiguration(), each.getImporterConfiguration()));
} }
} }
......
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
package org.apache.shardingsphere.scaling.core.job.preparer.resumer; package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob; import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
...@@ -72,37 +73,37 @@ public final class SyncPositionResumer { ...@@ -72,37 +73,37 @@ public final class SyncPositionResumer {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) { for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration())); MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
for (Entry<String, PositionManager<InventoryPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) { for (Entry<String, PositionManager<InventoryPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry))); result.add(syncTaskFactory.createInventoryDataSyncTask(newInventoryDumperConfiguration(each.getDumperConfiguration(), metaDataManager, entry), each.getImporterConfiguration()));
} }
} }
return result; return result;
} }
private SyncConfiguration newSyncConfiguration(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, PositionManager<InventoryPosition>> entry) { private InventoryDumperConfiguration newInventoryDumperConfiguration(final DumperConfiguration dumperConfiguration, final MetaDataManager metaDataManager,
final Entry<String, PositionManager<InventoryPosition>> entry) {
String[] splitTable = entry.getKey().split("#"); String[] splitTable = entry.getKey().split("#");
RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration()); InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfiguration);
splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]); splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
splitDumperConfig.setPositionManager(entry.getValue()); splitDumperConfig.setPositionManager(entry.getValue());
if (2 == splitTable.length) { if (2 == splitTable.length) {
splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1])); splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1]));
} }
splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0)); splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0));
return new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(), return splitDumperConfig;
splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
} }
private Map<String, PositionManager<InventoryPosition>> getInventoryPositionMap( private Map<String, PositionManager<InventoryPosition>> getInventoryPositionMap(
final RdbmsConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) { final DumperConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) {
Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName())); Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream() return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
.filter(entry -> pattern.matcher(entry.getKey()).find()) .filter(entry -> pattern.matcher(entry.getKey()).find())
.collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue));
} }
private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) { private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) { for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
each.getDumperConfiguration().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName())); each.getDumperConfiguration().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each)); shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(), each.getDumperConfiguration(), each.getImporterConfiguration()));
} }
} }
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter; package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException; import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
...@@ -59,19 +60,19 @@ public final class InventoryDataTaskSplitter { ...@@ -59,19 +60,19 @@ public final class InventoryDataTaskSplitter {
*/ */
public Collection<ScalingTask<InventoryPosition>> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) { public Collection<ScalingTask<InventoryPosition>> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
Collection<ScalingTask<InventoryPosition>> result = new LinkedList<>(); Collection<ScalingTask<InventoryPosition>> result = new LinkedList<>();
for (SyncConfiguration each : splitConfiguration(syncConfiguration, dataSourceManager)) { for (InventoryDumperConfiguration each : splitDumperConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getDumperConfiguration(), dataSourceManager)) {
result.add(syncTaskFactory.createInventoryDataSyncTask(each)); result.add(syncTaskFactory.createInventoryDataSyncTask(each, syncConfiguration.getImporterConfiguration()));
} }
return result; return result;
} }
private Collection<SyncConfiguration> splitConfiguration(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) { private Collection<InventoryDumperConfiguration> splitDumperConfiguration(final int concurrency, final DumperConfiguration dumperConfiguration, final DataSourceManager dataSourceManager) {
Collection<SyncConfiguration> result = new LinkedList<>(); Collection<InventoryDumperConfiguration> result = new LinkedList<>();
DataSource dataSource = dataSourceManager.getDataSource(syncConfiguration.getDumperConfiguration().getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(dumperConfiguration.getDataSourceConfiguration());
MetaDataManager metaDataManager = new MetaDataManager(dataSource); MetaDataManager metaDataManager = new MetaDataManager(dataSource);
for (SyncConfiguration each : splitByTable(syncConfiguration)) { for (InventoryDumperConfiguration each : splitByTable(dumperConfiguration)) {
if (isSpiltByPrimaryKeyRange(each.getDumperConfiguration(), metaDataManager)) { if (isSpiltByPrimaryKeyRange(each, metaDataManager)) {
result.addAll(splitByPrimaryKeyRange(each, metaDataManager, dataSource)); result.addAll(splitByPrimaryKeyRange(concurrency, each, metaDataManager, dataSource));
} else { } else {
result.add(each); result.add(each);
} }
...@@ -79,36 +80,35 @@ public final class InventoryDataTaskSplitter { ...@@ -79,36 +80,35 @@ public final class InventoryDataTaskSplitter {
return result; return result;
} }
private Collection<SyncConfiguration> splitByTable(final SyncConfiguration syncConfiguration) { private Collection<InventoryDumperConfiguration> splitByTable(final DumperConfiguration dumperConfiguration) {
Collection<SyncConfiguration> result = new LinkedList<>(); Collection<InventoryDumperConfiguration> result = new LinkedList<>();
for (String each : syncConfiguration.getTableNameMap().keySet()) { for (String each : dumperConfiguration.getTableNameMap().keySet()) {
RdbmsConfiguration dumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration()); InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(dumperConfiguration);
dumperConfig.setTableName(each); dumperConfig.setTableName(each);
dumperConfig.setPositionManager(new InventoryPositionManager<>(new PlaceholderPosition())); dumperConfig.setPositionManager(new InventoryPositionManager<>(new PlaceholderPosition()));
result.add(new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(), result.add(dumperConfig);
dumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
} }
return result; return result;
} }
private boolean isSpiltByPrimaryKeyRange(final RdbmsConfiguration rdbmsConfiguration, final MetaDataManager metaDataManager) { private boolean isSpiltByPrimaryKeyRange(final InventoryDumperConfiguration inventoryDumperConfiguration, final MetaDataManager metaDataManager) {
TableMetaData tableMetaData = metaDataManager.getTableMetaData(rdbmsConfiguration.getTableName()); TableMetaData tableMetaData = metaDataManager.getTableMetaData(inventoryDumperConfiguration.getTableName());
if (null == tableMetaData) { if (null == tableMetaData) {
log.warn("Can't split range for table {}, reason: can not get table metadata ", rdbmsConfiguration.getTableName()); log.warn("Can't split range for table {}, reason: can not get table metadata ", inventoryDumperConfiguration.getTableName());
return false; return false;
} }
List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns(); List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
if (null == primaryKeys || primaryKeys.isEmpty()) { if (null == primaryKeys || primaryKeys.isEmpty()) {
log.warn("Can't split range for table {}, reason: no primary key", rdbmsConfiguration.getTableName()); log.warn("Can't split range for table {}, reason: no primary key", inventoryDumperConfiguration.getTableName());
return false; return false;
} }
if (primaryKeys.size() > 1) { if (primaryKeys.size() > 1) {
log.warn("Can't split range for table {}, reason: primary key is union primary", rdbmsConfiguration.getTableName()); log.warn("Can't split range for table {}, reason: primary key is union primary", inventoryDumperConfiguration.getTableName());
return false; return false;
} }
int index = tableMetaData.findColumnIndex(primaryKeys.get(0)); int index = tableMetaData.findColumnIndex(primaryKeys.get(0));
if (isNotIntegerPrimary(tableMetaData.getColumnMetaData(index).getDataType())) { if (isNotIntegerPrimary(tableMetaData.getColumnMetaData(index).getDataType())) {
log.warn("Can't split range for table {}, reason: primary key is not integer number", rdbmsConfiguration.getTableName()); log.warn("Can't split range for table {}, reason: primary key is not integer number", inventoryDumperConfiguration.getTableName());
return false; return false;
} }
return true; return true;
...@@ -118,21 +118,20 @@ public final class InventoryDataTaskSplitter { ...@@ -118,21 +118,20 @@ public final class InventoryDataTaskSplitter {
return Types.INTEGER != columnType && Types.BIGINT != columnType && Types.SMALLINT != columnType && Types.TINYINT != columnType; return Types.INTEGER != columnType && Types.BIGINT != columnType && Types.SMALLINT != columnType && Types.TINYINT != columnType;
} }
private Collection<SyncConfiguration> splitByPrimaryKeyRange(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final DataSource dataSource) { private Collection<InventoryDumperConfiguration> splitByPrimaryKeyRange(final int concurrency, final InventoryDumperConfiguration inventoryDumperConfiguration,
int concurrency = syncConfiguration.getConcurrency(); final MetaDataManager metaDataManager, final DataSource dataSource) {
Collection<SyncConfiguration> result = new LinkedList<>(); Collection<InventoryDumperConfiguration> result = new LinkedList<>();
RdbmsConfiguration dumperConfiguration = syncConfiguration.getDumperConfiguration(); String primaryKey = metaDataManager.getTableMetaData(inventoryDumperConfiguration.getTableName()).getPrimaryKeyColumns().get(0);
String primaryKey = metaDataManager.getTableMetaData(dumperConfiguration.getTableName()).getPrimaryKeyColumns().get(0); inventoryDumperConfiguration.setPrimaryKey(primaryKey);
dumperConfiguration.setPrimaryKey(primaryKey);
try (Connection connection = dataSource.getConnection()) { try (Connection connection = dataSource.getConnection()) {
PreparedStatement ps = connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 1", primaryKey, primaryKey, dumperConfiguration.getTableName())); PreparedStatement ps = connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 1", primaryKey, primaryKey, inventoryDumperConfiguration.getTableName()));
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
rs.next(); rs.next();
long min = rs.getLong(1); long min = rs.getLong(1);
long max = rs.getLong(2); long max = rs.getLong(2);
long step = (max - min) / concurrency; long step = (max - min) / concurrency;
for (int i = 0; i < concurrency && min <= max; i++) { for (int i = 0; i < concurrency && min <= max; i++) {
RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration); InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(inventoryDumperConfiguration);
if (i < concurrency - 1) { if (i < concurrency - 1) {
splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, min + step))); splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, min + step)));
min += step + 1; min += step + 1;
...@@ -140,11 +139,10 @@ public final class InventoryDataTaskSplitter { ...@@ -140,11 +139,10 @@ public final class InventoryDataTaskSplitter {
splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, max))); splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, max)));
} }
splitDumperConfig.setSpiltNum(i); splitDumperConfig.setSpiltNum(i);
result.add(new SyncConfiguration(concurrency, syncConfiguration.getTableNameMap(), result.add(splitDumperConfig);
splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
} }
} catch (final SQLException ex) { } catch (final SQLException ex) {
throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfiguration.getTableName(), primaryKey), ex); throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", inventoryDumperConfiguration.getTableName(), primaryKey), ex);
} }
return result; return result;
} }
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
package org.apache.shardingsphere.scaling.core.job.task; package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition; import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask; import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask; import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
...@@ -36,12 +38,12 @@ public final class DefaultSyncTaskFactory implements SyncTaskFactory { ...@@ -36,12 +38,12 @@ public final class DefaultSyncTaskFactory implements SyncTaskFactory {
} }
@Override @Override
public InventoryDataScalingTask createInventoryDataSyncTask(final SyncConfiguration syncConfiguration) { public InventoryDataScalingTask createInventoryDataSyncTask(final InventoryDumperConfiguration inventoryDumperConfiguration, final ImporterConfiguration importerConfiguration) {
return new InventoryDataScalingTask(syncConfiguration); return new InventoryDataScalingTask(inventoryDumperConfiguration, importerConfiguration);
} }
@Override @Override
public IncrementalDataScalingTask createIncrementalDataSyncTask(final SyncConfiguration syncConfiguration) { public IncrementalDataScalingTask createIncrementalDataSyncTask(final int concurrency, final DumperConfiguration dumperConfiguration, final ImporterConfiguration importerConfiguration) {
return new IncrementalDataScalingTask(syncConfiguration); return new IncrementalDataScalingTask(concurrency, dumperConfiguration, importerConfiguration);
} }
} }
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
package org.apache.shardingsphere.scaling.core.job.task; package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition; import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask; import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask; import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
...@@ -41,16 +43,19 @@ public interface SyncTaskFactory { ...@@ -41,16 +43,19 @@ public interface SyncTaskFactory {
/** /**
* Create inventory data sync task. * Create inventory data sync task.
* *
* @param syncConfiguration sync configuration * @param inventoryDumperConfiguration inventory dumper configuration
* @param importerConfiguration importer configuration
* @return inventory data sync task * @return inventory data sync task
*/ */
InventoryDataScalingTask createInventoryDataSyncTask(SyncConfiguration syncConfiguration); InventoryDataScalingTask createInventoryDataSyncTask(InventoryDumperConfiguration inventoryDumperConfiguration, ImporterConfiguration importerConfiguration);
/** /**
* Create incremental data sync task. * Create incremental data sync task.
* *
* @param syncConfiguration sync configuration * @param concurrency concurrency
* @param dumperConfiguration dumper configuration
* @param importerConfiguration importer configuration
* @return incremental data sync task * @return incremental data sync task
*/ */
IncrementalDataScalingTask createIncrementalDataSyncTask(SyncConfiguration syncConfiguration); IncrementalDataScalingTask createIncrementalDataSyncTask(int concurrency, DumperConfiguration dumperConfiguration, ImporterConfiguration importerConfiguration);
} }
...@@ -18,8 +18,9 @@ ...@@ -18,8 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.task.incremental; package org.apache.shardingsphere.scaling.core.job.task.incremental;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext; import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException; import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback; import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
...@@ -46,7 +47,11 @@ import java.util.concurrent.Future; ...@@ -46,7 +47,11 @@ import java.util.concurrent.Future;
@Slf4j @Slf4j
public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor<IncrementalPosition> implements ScalingTask<IncrementalPosition> { public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor<IncrementalPosition> implements ScalingTask<IncrementalPosition> {
private final SyncConfiguration syncConfiguration; private final int concurrency;
private final DumperConfiguration dumperConfiguration;
private final ImporterConfiguration importerConfiguration;
private final DataSourceManager dataSourceManager; private final DataSourceManager dataSourceManager;
...@@ -55,17 +60,18 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe ...@@ -55,17 +60,18 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private long delayMillisecond; private long delayMillisecond;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration) { public IncrementalDataScalingTask(final int concurrency, final DumperConfiguration dumperConfiguration, final ImporterConfiguration importerConfiguration) {
this.syncConfiguration = syncConfiguration; this.concurrency = concurrency;
this.dumperConfiguration = dumperConfiguration;
this.importerConfiguration = importerConfiguration;
dataSourceManager = new DataSourceManager(); dataSourceManager = new DataSourceManager();
setTaskId(syncConfiguration.getDumperConfiguration().getDataSourceName()); setTaskId(dumperConfiguration.getDataSourceName());
setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager()); setPositionManager(dumperConfiguration.getPositionManager());
} }
@Override @Override
public void start() { public void start() {
syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap()); dumper = DumperFactory.newInstanceLogDumper(dumperConfiguration, getPositionManager().getPosition());
dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getPosition());
Collection<Importer> importers = instanceImporters(); Collection<Importer> importers = instanceImporters();
instanceChannel(importers); instanceChannel(importers);
Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() { Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
...@@ -86,9 +92,9 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe ...@@ -86,9 +92,9 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
} }
private List<Importer> instanceImporters() { private List<Importer> instanceImporters() {
List<Importer> result = new ArrayList<>(syncConfiguration.getConcurrency()); List<Importer> result = new ArrayList<>(concurrency);
for (int i = 0; i < syncConfiguration.getConcurrency(); i++) { for (int i = 0; i < concurrency; i++) {
result.add(ImporterFactory.newInstance(syncConfiguration.getImporterConfiguration(), dataSourceManager)); result.add(ImporterFactory.newInstance(importerConfiguration, dataSourceManager));
} }
return result; return result;
} }
......
...@@ -18,10 +18,9 @@ ...@@ -18,10 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory; package org.apache.shardingsphere.scaling.core.job.task.inventory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext; import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException; import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback; import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
...@@ -37,6 +36,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Record; ...@@ -37,6 +36,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.SyncProgress; import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition; import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask; import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
...@@ -52,7 +52,9 @@ import java.util.concurrent.atomic.AtomicLong; ...@@ -52,7 +52,9 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> { public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> {
private final SyncConfiguration syncConfiguration; private final InventoryDumperConfiguration inventoryDumperConfiguration;
private final ImporterConfiguration importerConfiguration;
private final DataSourceManager dataSourceManager; private final DataSourceManager dataSourceManager;
...@@ -62,28 +64,29 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu ...@@ -62,28 +64,29 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
private Dumper dumper; private Dumper dumper;
public InventoryDataScalingTask(final SyncConfiguration syncConfiguration) { public InventoryDataScalingTask(final InventoryDumperConfiguration inventoryDumperConfiguration, final ImporterConfiguration importerConfiguration) {
this(syncConfiguration, new DataSourceManager()); this(inventoryDumperConfiguration, importerConfiguration, new DataSourceManager());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public InventoryDataScalingTask(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) { public InventoryDataScalingTask(final InventoryDumperConfiguration inventoryDumperConfiguration, final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
this.syncConfiguration = syncConfiguration; this.inventoryDumperConfiguration = inventoryDumperConfiguration;
this.importerConfiguration = importerConfiguration;
this.dataSourceManager = dataSourceManager; this.dataSourceManager = dataSourceManager;
setTaskId(generateSyncTaskId(syncConfiguration.getDumperConfiguration())); setTaskId(generateSyncTaskId(inventoryDumperConfiguration));
setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager()); setPositionManager(inventoryDumperConfiguration.getPositionManager());
} }
private String generateSyncTaskId(final RdbmsConfiguration dumperConfiguration) { private String generateSyncTaskId(final InventoryDumperConfiguration inventoryDumperConfiguration) {
String result = String.format("%s.%s", dumperConfiguration.getDataSourceName(), dumperConfiguration.getTableName()); String result = String.format("%s.%s", inventoryDumperConfiguration.getDataSourceName(), inventoryDumperConfiguration.getTableName());
return null == dumperConfiguration.getSpiltNum() ? result : result + "#" + dumperConfiguration.getSpiltNum(); return null == inventoryDumperConfiguration.getSpiltNum() ? result : result + "#" + inventoryDumperConfiguration.getSpiltNum();
} }
@Override @Override
public void start() { public void start() {
getEstimatedRows(); getEstimatedRows();
instanceDumper(); instanceDumper();
Importer importer = ImporterFactory.newInstance(syncConfiguration.getImporterConfiguration(), dataSourceManager); Importer importer = ImporterFactory.newInstance(importerConfiguration, dataSourceManager);
instanceChannel(importer); instanceChannel(importer);
Future<?> future = ScalingContext.getInstance().getImporterExecuteEngine().submit(importer, new ExecuteCallback() { Future<?> future = ScalingContext.getInstance().getImporterExecuteEngine().submit(importer, new ExecuteCallback() {
...@@ -103,11 +106,11 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu ...@@ -103,11 +106,11 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
} }
private void getEstimatedRows() { private void getEstimatedRows() {
DataSource dataSource = dataSourceManager.getDataSource(syncConfiguration.getDumperConfiguration().getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(inventoryDumperConfiguration.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection()) { try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.prepareStatement(String.format("SELECT COUNT(*) FROM %s %s", ResultSet resultSet = connection.prepareStatement(String.format("SELECT COUNT(*) FROM %s %s",
syncConfiguration.getDumperConfiguration().getTableName(), inventoryDumperConfiguration.getTableName(),
RdbmsConfigurationUtil.getWhereCondition(syncConfiguration.getDumperConfiguration()))) RdbmsConfigurationUtil.getWhereCondition(inventoryDumperConfiguration)))
.executeQuery(); .executeQuery();
resultSet.next(); resultSet.next();
estimatedRows = resultSet.getInt(1); estimatedRows = resultSet.getInt(1);
...@@ -117,8 +120,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu ...@@ -117,8 +120,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
} }
private void instanceDumper() { private void instanceDumper() {
syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap()); dumper = DumperFactory.newInstanceJdbcDumper(inventoryDumperConfiguration, dataSourceManager);
dumper = DumperFactory.newInstanceJdbcDumper(syncConfiguration.getDumperConfiguration(), dataSourceManager);
} }
private void instanceChannel(final Importer importer) { private void instanceChannel(final Importer importer) {
......
...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.utils; ...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.utils;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager; import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition; import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
...@@ -32,12 +32,12 @@ public final class RdbmsConfigurationUtil { ...@@ -32,12 +32,12 @@ public final class RdbmsConfigurationUtil {
/** /**
* Get SQL where condition whit primary key. * Get SQL where condition whit primary key.
* *
* @param rdbmsConfiguration rdbms configuration * @param inventoryDumperConfiguration rdbms configuration
* @return SQL where condition * @return SQL where condition
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static String getWhereCondition(final RdbmsConfiguration rdbmsConfiguration) { public static String getWhereCondition(final InventoryDumperConfiguration inventoryDumperConfiguration) {
return getWhereCondition(rdbmsConfiguration.getPrimaryKey(), rdbmsConfiguration.getPositionManager()); return getWhereCondition(inventoryDumperConfiguration.getPrimaryKey(), inventoryDumperConfiguration.getPositionManager());
} }
private static String getWhereCondition(final String primaryKey, final PositionManager<PrimaryKeyPosition> positionManager) { private static String getWhereCondition(final String primaryKey, final PositionManager<PrimaryKeyPosition> positionManager) {
......
...@@ -22,9 +22,10 @@ import com.google.common.collect.Sets; ...@@ -22,9 +22,10 @@ import com.google.common.collect.Sets;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.config.DataSourceConfiguration; import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration; import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration; import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser; import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser;
...@@ -64,10 +65,10 @@ public final class SyncConfigurationUtil { ...@@ -64,10 +65,10 @@ public final class SyncConfigurationUtil {
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet()); Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet());
filterByShardingDataSourceTables(dataSourceTableNameMap, scalingConfiguration.getJobConfiguration()); filterByShardingDataSourceTables(dataSourceTableNameMap, scalingConfiguration.getJobConfiguration());
for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) { for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
RdbmsConfiguration dumperConfiguration = createDumperConfiguration(entry.getKey(), sourceDatasource.get(entry.getKey())); DumperConfiguration dumperConfiguration = createDumperConfiguration(entry.getKey(), sourceDatasource.get(entry.getKey()), entry.getValue());
dumperConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes()); ImporterConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule); importerConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes());
result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), entry.getValue(), dumperConfiguration, importerConfiguration)); result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), dumperConfiguration, importerConfiguration));
} }
return result; return result;
} }
...@@ -147,19 +148,20 @@ public final class SyncConfigurationUtil { ...@@ -147,19 +148,20 @@ public final class SyncConfigurationUtil {
} }
} }
private static RdbmsConfiguration createDumperConfiguration(final String dataSourceName, final DataSourceConfiguration dataSourceConfiguration) { private static DumperConfiguration createDumperConfiguration(final String dataSourceName, final DataSourceConfiguration dataSourceConfiguration, final Map<String, String> tableMap) {
RdbmsConfiguration result = new RdbmsConfiguration(); DumperConfiguration result = new DumperConfiguration();
result.setDataSourceName(dataSourceName); result.setDataSourceName(dataSourceName);
Map<String, Object> dataSourceProperties = dataSourceConfiguration.getProps(); Map<String, Object> dataSourceProperties = dataSourceConfiguration.getProps();
JDBCDataSourceConfiguration dumperDataSourceConfiguration = new JDBCDataSourceConfiguration( JDBCDataSourceConfiguration dumperDataSourceConfiguration = new JDBCDataSourceConfiguration(
dataSourceProperties.containsKey("jdbcUrl") ? dataSourceProperties.get("jdbcUrl").toString() : dataSourceProperties.get("url").toString(), dataSourceProperties.containsKey("jdbcUrl") ? dataSourceProperties.get("jdbcUrl").toString() : dataSourceProperties.get("url").toString(),
dataSourceProperties.get("username").toString(), dataSourceProperties.get("password").toString()); dataSourceProperties.get("username").toString(), dataSourceProperties.get("password").toString());
result.setDataSourceConfiguration(dumperDataSourceConfiguration); result.setDataSourceConfiguration(dumperDataSourceConfiguration);
result.setTableNameMap(tableMap);
return result; return result;
} }
private static RdbmsConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration, final ShardingRuleConfiguration shardingRuleConfig) { private static ImporterConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration, final ShardingRuleConfiguration shardingRuleConfig) {
RdbmsConfiguration result = new RdbmsConfiguration(); ImporterConfiguration result = new ImporterConfiguration();
JDBCDataSourceConfiguration importerDataSourceConfiguration = new JDBCDataSourceConfiguration( JDBCDataSourceConfiguration importerDataSourceConfiguration = new JDBCDataSourceConfiguration(
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUrl(), scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUrl(),
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUsername(), scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUsername(),
......
...@@ -21,7 +21,7 @@ import com.google.common.collect.Maps; ...@@ -21,7 +21,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel; import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column; import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
...@@ -85,7 +85,7 @@ public final class AbstractJDBCImporterTest { ...@@ -85,7 +85,7 @@ public final class AbstractJDBCImporterTest {
@Before @Before
public void setUp() throws SQLException { public void setUp() throws SQLException {
jdbcImporter = new AbstractJDBCImporter(getRdbmsConfiguration(), dataSourceManager) { jdbcImporter = new AbstractJDBCImporter(getImporterConfiguration(), dataSourceManager) {
@Override @Override
protected AbstractSqlBuilder createSqlBuilder() { protected AbstractSqlBuilder createSqlBuilder() {
...@@ -158,9 +158,8 @@ public final class AbstractJDBCImporterTest { ...@@ -158,9 +158,8 @@ public final class AbstractJDBCImporterTest {
return result; return result;
} }
private RdbmsConfiguration getRdbmsConfiguration() { private ImporterConfiguration getImporterConfiguration() {
RdbmsConfiguration result = new RdbmsConfiguration(); ImporterConfiguration result = new ImporterConfiguration();
result.setTableName(TABLE_NAME);
result.setDataSourceConfiguration(dataSourceConfiguration); result.setDataSourceConfiguration(dataSourceConfiguration);
Map<String, Set<String>> shardingColumnsMap = Maps.newHashMap(); Map<String, Set<String>> shardingColumnsMap = Maps.newHashMap();
shardingColumnsMap.put("test_table", Sets.newHashSet("user")); shardingColumnsMap.put("test_table", Sets.newHashSet("user"));
......
...@@ -17,9 +17,9 @@ ...@@ -17,9 +17,9 @@
package org.apache.shardingsphere.scaling.core.fixture; package org.apache.shardingsphere.scaling.core.fixture;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
...@@ -27,8 +27,8 @@ import java.sql.SQLException; ...@@ -27,8 +27,8 @@ import java.sql.SQLException;
public final class FixtureH2JDBCDumper extends AbstractJDBCDumper { public final class FixtureH2JDBCDumper extends AbstractJDBCDumper {
public FixtureH2JDBCDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public FixtureH2JDBCDumper(final InventoryDumperConfiguration dumperConfiguration, final DataSourceManager dataSourceManager) {
super(rdbmsConfiguration, dataSourceManager); super(dumperConfiguration, dataSourceManager);
} }
@Override @Override
......
...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.fixture; ...@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.fixture;
import java.util.List; import java.util.List;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel; import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer; import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
...@@ -32,7 +32,7 @@ public final class FixtureNopImporter implements Importer { ...@@ -32,7 +32,7 @@ public final class FixtureNopImporter implements Importer {
private Channel channel; private Channel channel;
public FixtureNopImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public FixtureNopImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
} }
@Override @Override
......
...@@ -18,8 +18,9 @@ ...@@ -18,8 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.preparer.resumer; package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext; import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration; import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
...@@ -83,18 +84,19 @@ public final class SyncPositionResumerTest { ...@@ -83,18 +84,19 @@ public final class SyncPositionResumerTest {
} }
private SyncConfiguration mockSyncConfiguration() { private SyncConfiguration mockSyncConfiguration() {
RdbmsConfiguration dumperConfig = mockDumperConfig(); DumperConfiguration dumperConfig = mockDumperConfig();
RdbmsConfiguration importerConfig = new RdbmsConfiguration(); ImporterConfiguration importerConfig = new ImporterConfiguration();
Map<String, String> tableMap = new HashMap<>(); return new SyncConfiguration(3, dumperConfig, importerConfig);
tableMap.put("t_order", "t_order");
return new SyncConfiguration(3, tableMap, dumperConfig, importerConfig);
} }
private RdbmsConfiguration mockDumperConfig() { private DumperConfiguration mockDumperConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD); DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
RdbmsConfiguration result = new RdbmsConfiguration(); DumperConfiguration result = new DumperConfiguration();
result.setDataSourceName("ds0"); result.setDataSourceName("ds0");
result.setDataSourceConfiguration(dataSourceConfiguration); result.setDataSourceConfiguration(dataSourceConfiguration);
Map<String, String> tableMap = new HashMap<>();
tableMap.put("t_order", "t_order");
result.setTableNameMap(tableMap);
return result; return result;
} }
} }
...@@ -18,8 +18,9 @@ ...@@ -18,8 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter; package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition; import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
...@@ -56,12 +57,9 @@ public final class InventoryDataTaskSplitterTest { ...@@ -56,12 +57,9 @@ public final class InventoryDataTaskSplitterTest {
@Before @Before
public void setUp() { public void setUp() {
RdbmsConfiguration dumperConfig = mockDumperConfig(); DumperConfiguration dumperConfig = mockDumperConfig();
RdbmsConfiguration importerConfig = new RdbmsConfiguration(); ImporterConfiguration importerConfig = new ImporterConfiguration();
Map<String, String> tableMap = new HashMap<>(); syncConfiguration = new SyncConfiguration(3, dumperConfig, importerConfig);
tableMap.put("t_order", "t_order");
syncConfiguration = new SyncConfiguration(3, tableMap,
dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager(); dataSourceManager = new DataSourceManager();
inventoryDataTaskSplitter = new InventoryDataTaskSplitter(); inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
} }
...@@ -103,7 +101,7 @@ public final class InventoryDataTaskSplitterTest { ...@@ -103,7 +101,7 @@ public final class InventoryDataTaskSplitterTest {
assertThat(actual.size(), is(1)); assertThat(actual.size(), is(1));
} }
private void initIntPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException { private void initIntPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection(); try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
...@@ -113,7 +111,7 @@ public final class InventoryDataTaskSplitterTest { ...@@ -113,7 +111,7 @@ public final class InventoryDataTaskSplitterTest {
} }
} }
private void initCharPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException { private void initCharPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection(); try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
...@@ -123,7 +121,7 @@ public final class InventoryDataTaskSplitterTest { ...@@ -123,7 +121,7 @@ public final class InventoryDataTaskSplitterTest {
} }
} }
private void initUnionPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException { private void initUnionPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection(); try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
...@@ -133,7 +131,7 @@ public final class InventoryDataTaskSplitterTest { ...@@ -133,7 +131,7 @@ public final class InventoryDataTaskSplitterTest {
} }
} }
private void initNoPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException { private void initNoPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection(); try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
...@@ -143,10 +141,13 @@ public final class InventoryDataTaskSplitterTest { ...@@ -143,10 +141,13 @@ public final class InventoryDataTaskSplitterTest {
} }
} }
private RdbmsConfiguration mockDumperConfig() { private DumperConfiguration mockDumperConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD); DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
RdbmsConfiguration result = new RdbmsConfiguration(); DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration); result.setDataSourceConfiguration(dataSourceConfiguration);
Map<String, String> tableMap = new HashMap<>();
tableMap.put("t_order", "t_order");
result.setTableNameMap(tableMap);
return result; return result;
} }
} }
...@@ -18,8 +18,10 @@ ...@@ -18,8 +18,10 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory; package org.apache.shardingsphere.scaling.core.job.task.inventory;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext; import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration; import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration; import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
...@@ -54,10 +56,10 @@ public final class InventoryDataScalingTaskTest { ...@@ -54,10 +56,10 @@ public final class InventoryDataScalingTaskTest {
@Before @Before
public void setUp() { public void setUp() {
RdbmsConfiguration dumperConfig = mockDumperConfig(); DumperConfiguration dumperConfig = mockDumperConfig();
RdbmsConfiguration importerConfig = mockImporterConfig(); ImporterConfiguration importerConfig = mockImporterConfig();
ScalingContext.getInstance().init(new ServerConfiguration()); ScalingContext.getInstance().init(new ServerConfiguration());
syncConfiguration = new SyncConfiguration(3, Collections.emptyMap(), dumperConfig, importerConfig); syncConfiguration = new SyncConfiguration(3, dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager(); dataSourceManager = new DataSourceManager();
} }
...@@ -68,20 +70,26 @@ public final class InventoryDataScalingTaskTest { ...@@ -68,20 +70,26 @@ public final class InventoryDataScalingTaskTest {
@Test(expected = SyncTaskExecuteException.class) @Test(expected = SyncTaskExecuteException.class)
public void assertStartWithGetEstimatedRowsFailure() { public void assertStartWithGetEstimatedRowsFailure() {
syncConfiguration.getDumperConfiguration().setTableName("t_non_exist"); InventoryDumperConfiguration inventoryDumperConfiguration = new InventoryDumperConfiguration(syncConfiguration.getDumperConfiguration());
InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(syncConfiguration, dataSourceManager); inventoryDumperConfiguration.setTableName("t_non_exist");
InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(
inventoryDumperConfiguration, syncConfiguration.getImporterConfiguration(), dataSourceManager);
inventoryDataSyncTask.start(); inventoryDataSyncTask.start();
} }
@Test @Test
public void assertGetProgress() throws SQLException { public void assertGetProgress() throws SQLException {
initTableData(syncConfiguration.getDumperConfiguration()); initTableData(syncConfiguration.getDumperConfiguration());
InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(syncConfiguration, dataSourceManager); InventoryDumperConfiguration inventoryDumperConfiguration = new InventoryDumperConfiguration(syncConfiguration.getDumperConfiguration());
inventoryDumperConfiguration.setTableName("t_order");
inventoryDumperConfiguration.setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(
inventoryDumperConfiguration, syncConfiguration.getImporterConfiguration(), dataSourceManager);
inventoryDataSyncTask.start(); inventoryDataSyncTask.start();
assertThat(((InventoryDataSyncTaskProgress) inventoryDataSyncTask.getProgress()).getEstimatedRows(), is(2L)); assertThat(((InventoryDataSyncTaskProgress) inventoryDataSyncTask.getProgress()).getEstimatedRows(), is(2L));
} }
private void initTableData(final RdbmsConfiguration dumperConfig) throws SQLException { private void initTableData(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration()); DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection(); try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
...@@ -91,18 +99,18 @@ public final class InventoryDataScalingTaskTest { ...@@ -91,18 +99,18 @@ public final class InventoryDataScalingTaskTest {
} }
} }
private RdbmsConfiguration mockDumperConfig() { private DumperConfiguration mockDumperConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD); DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
RdbmsConfiguration result = new RdbmsConfiguration(); DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration); result.setDataSourceConfiguration(dataSourceConfiguration);
result.setTableName("t_order");
result.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(1, 100))); result.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(1, 100)));
result.setTableNameMap(Collections.emptyMap());
return result; return result;
} }
private RdbmsConfiguration mockImporterConfig() { private ImporterConfiguration mockImporterConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD); DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
RdbmsConfiguration result = new RdbmsConfiguration(); ImporterConfiguration result = new ImporterConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration); result.setDataSourceConfiguration(dataSourceConfiguration);
return result; return result;
} }
......
...@@ -46,7 +46,7 @@ public final class SyncConfigurationUtilTest { ...@@ -46,7 +46,7 @@ public final class SyncConfigurationUtilTest {
@Test @Test
public void assertFilterByShardingDataSourceTables() { public void assertFilterByShardingDataSourceTables() {
List<SyncConfiguration> syncConfigurations = (List<SyncConfiguration>) SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration); List<SyncConfiguration> syncConfigurations = (List<SyncConfiguration>) SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration);
assertThat(syncConfigurations.get(0).getTableNameMap().size(), is(1)); assertThat(syncConfigurations.get(0).getDumperConfiguration().getTableNameMap().size(), is(1));
} }
private void initConfig(final String configFile) { private void initConfig(final String configFile) {
......
...@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.mysql; ...@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.mysql;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant; import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory; import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor; import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
...@@ -59,7 +59,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin ...@@ -59,7 +59,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
private final BinlogPosition binlogPosition; private final BinlogPosition binlogPosition;
private final RdbmsConfiguration rdbmsConfiguration; private final DumperConfiguration dumperConfiguration;
private final MetaDataManager metaDataManager; private final MetaDataManager metaDataManager;
...@@ -68,13 +68,13 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin ...@@ -68,13 +68,13 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
@Setter @Setter
private Channel channel; private Channel channel;
public MySQLBinlogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position binlogPosition) { public MySQLBinlogDumper(final DumperConfiguration dumperConfiguration, final Position binlogPosition) {
this.binlogPosition = (BinlogPosition) binlogPosition; this.binlogPosition = (BinlogPosition) binlogPosition;
if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) { if (!JDBCDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("MySQLBinlogDumper only support JDBCDataSourceConfiguration"); throw new UnsupportedOperationException("MySQLBinlogDumper only support JDBCDataSourceConfiguration");
} }
this.rdbmsConfiguration = rdbmsConfiguration; this.dumperConfiguration = dumperConfiguration;
metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(rdbmsConfiguration.getDataSourceConfiguration())); metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(dumperConfiguration.getDataSourceConfiguration()));
} }
@Override @Override
...@@ -84,7 +84,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin ...@@ -84,7 +84,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
} }
private void dump() { private void dump() {
JDBCDataSourceConfiguration jdbcDataSourceConfig = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration(); JDBCDataSourceConfiguration jdbcDataSourceConfig = (JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration();
JdbcUri uri = new JdbcUri(jdbcDataSourceConfig.getJdbcUrl()); JdbcUri uri = new JdbcUri(jdbcDataSourceConfig.getJdbcUrl());
MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfig.getUsername(), jdbcDataSourceConfig.getPassword())); MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfig.getUsername(), jdbcDataSourceConfig.getPassword()));
client.connect(); client.connect();
...@@ -164,7 +164,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin ...@@ -164,7 +164,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) { private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount); DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
result.setTableName(rdbmsConfiguration.getTableNameMap().get(rowsEvent.getTableName())); result.setTableName(dumperConfiguration.getTableNameMap().get(rowsEvent.getTableName()));
result.setCommitTime(rowsEvent.getTimestamp() * 1000); result.setCommitTime(rowsEvent.getTimestamp() * 1000);
return result; return result;
} }
...@@ -183,6 +183,6 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin ...@@ -183,6 +183,6 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
} }
private boolean filter(final String database, final String schemaName, final String tableName) { private boolean filter(final String database, final String schemaName, final String tableName) {
return !schemaName.equals(database) || !rdbmsConfiguration.getTableNameMap().containsKey(tableName); return !schemaName.equals(database) || !dumperConfiguration.getTableNameMap().containsKey(tableName);
} }
} }
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.mysql; package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter; import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder; import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
...@@ -27,8 +27,8 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Abstract ...@@ -27,8 +27,8 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Abstract
*/ */
public final class MySQLImporter extends AbstractJDBCImporter { public final class MySQLImporter extends AbstractJDBCImporter {
public MySQLImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public MySQLImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
super(rdbmsConfiguration, dataSourceManager); super(importerConfiguration, dataSourceManager);
} }
@Override @Override
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.mysql; package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper; import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import org.apache.shardingsphere.scaling.core.metadata.JdbcUri; import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
...@@ -36,9 +36,9 @@ import java.util.Map.Entry; ...@@ -36,9 +36,9 @@ import java.util.Map.Entry;
*/ */
public final class MySQLJdbcDumper extends AbstractJDBCDumper { public final class MySQLJdbcDumper extends AbstractJDBCDumper {
public MySQLJdbcDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public MySQLJdbcDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
super(rdbmsConfiguration, dataSourceManager); super(inventoryDumperConfiguration, dataSourceManager);
JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) getRdbmsConfiguration().getDataSourceConfiguration(); JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) getInventoryDumperConfiguration().getDataSourceConfiguration();
jdbcDataSourceConfiguration.setJdbcUrl(fixMySQLUrl(jdbcDataSourceConfiguration.getJdbcUrl())); jdbcDataSourceConfiguration.setJdbcUrl(fixMySQLUrl(jdbcDataSourceConfiguration.getJdbcUrl()));
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql; package org.apache.shardingsphere.scaling.postgresql;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter; import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder; import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
...@@ -27,8 +27,8 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Abstract ...@@ -27,8 +27,8 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Abstract
*/ */
public final class PostgreSQLImporter extends AbstractJDBCImporter { public final class PostgreSQLImporter extends AbstractJDBCImporter {
public PostgreSQLImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public PostgreSQLImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
super(rdbmsConfiguration, dataSourceManager); super(importerConfiguration, dataSourceManager);
} }
@Override @Override
......
...@@ -17,9 +17,9 @@ ...@@ -17,9 +17,9 @@
package org.apache.shardingsphere.scaling.postgresql; package org.apache.shardingsphere.scaling.postgresql;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration; import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
...@@ -31,8 +31,8 @@ import java.sql.SQLException; ...@@ -31,8 +31,8 @@ import java.sql.SQLException;
*/ */
public final class PostgreSQLJdbcDumper extends AbstractJDBCDumper { public final class PostgreSQLJdbcDumper extends AbstractJDBCDumper {
public PostgreSQLJdbcDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) { public PostgreSQLJdbcDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
super(rdbmsConfiguration, dataSourceManager); super(inventoryDumperConfiguration, dataSourceManager);
} }
@Override @Override
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
package org.apache.shardingsphere.scaling.postgresql; package org.apache.shardingsphere.scaling.postgresql;
import lombok.Setter; import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException; import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor; import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel; import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
...@@ -47,7 +47,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W ...@@ -47,7 +47,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
private final WalPosition walPosition; private final WalPosition walPosition;
private final RdbmsConfiguration rdbmsConfiguration; private final DumperConfiguration dumperConfiguration;
private final LogicalReplication logicalReplication = new LogicalReplication(); private final LogicalReplication logicalReplication = new LogicalReplication();
...@@ -56,13 +56,13 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W ...@@ -56,13 +56,13 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
@Setter @Setter
private Channel channel; private Channel channel;
public PostgreSQLWalDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) { public PostgreSQLWalDumper(final DumperConfiguration dumperConfiguration, final Position position) {
walPosition = (WalPosition) position; walPosition = (WalPosition) position;
if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) { if (!JDBCDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration"); throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
} }
this.rdbmsConfiguration = rdbmsConfiguration; this.dumperConfiguration = dumperConfiguration;
walEventConverter = new WalEventConverter(rdbmsConfiguration); walEventConverter = new WalEventConverter(dumperConfiguration);
} }
@Override @Override
...@@ -73,7 +73,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W ...@@ -73,7 +73,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
private void dump() { private void dump() {
try { try {
PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration()); PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils()); DecodingPlugin decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber()); PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.postgresql.wal; package org.apache.shardingsphere.scaling.postgresql.wal;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration; import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant; import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory; import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column; import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
...@@ -42,13 +42,13 @@ import java.util.List; ...@@ -42,13 +42,13 @@ import java.util.List;
*/ */
public final class WalEventConverter { public final class WalEventConverter {
private final RdbmsConfiguration rdbmsConfiguration; private final DumperConfiguration dumperConfiguration;
private final MetaDataManager metaDataManager; private final MetaDataManager metaDataManager;
public WalEventConverter(final RdbmsConfiguration rdbmsConfiguration) { public WalEventConverter(final DumperConfiguration dumperConfiguration) {
this.rdbmsConfiguration = rdbmsConfiguration; this.dumperConfiguration = dumperConfiguration;
metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(rdbmsConfiguration.getDataSourceConfiguration())); metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(dumperConfiguration.getDataSourceConfiguration()));
} }
/** /**
...@@ -58,7 +58,7 @@ public final class WalEventConverter { ...@@ -58,7 +58,7 @@ public final class WalEventConverter {
* @return record * @return record
*/ */
public Record convert(final AbstractWalEvent event) { public Record convert(final AbstractWalEvent event) {
JdbcUri uri = new JdbcUri(((JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration()).getJdbcUrl()); JdbcUri uri = new JdbcUri(((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration()).getJdbcUrl());
if (filter(uri.getDatabase(), event)) { if (filter(uri.getDatabase(), event)) {
return createPlaceholderRecord(event); return createPlaceholderRecord(event);
} else if (event instanceof WriteRowEvent) { } else if (event instanceof WriteRowEvent) {
...@@ -76,7 +76,7 @@ public final class WalEventConverter { ...@@ -76,7 +76,7 @@ public final class WalEventConverter {
private boolean filter(final String database, final AbstractWalEvent event) { private boolean filter(final String database, final AbstractWalEvent event) {
if (isRowEvent(event)) { if (isRowEvent(event)) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event; AbstractRowEvent rowEvent = (AbstractRowEvent) event;
return !rowEvent.getSchemaName().equals(database) || !rdbmsConfiguration.getTableNameMap().containsKey(rowEvent.getTableName()); return !rowEvent.getSchemaName().equals(database) || !dumperConfiguration.getTableNameMap().containsKey(rowEvent.getTableName());
} }
return false; return false;
} }
...@@ -118,7 +118,7 @@ public final class WalEventConverter { ...@@ -118,7 +118,7 @@ public final class WalEventConverter {
private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) { private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount); DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setTableName(rdbmsConfiguration.getTableNameMap().get(rowsEvent.getTableName())); result.setTableName(dumperConfiguration.getTableNameMap().get(rowsEvent.getTableName()));
return result; return result;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册