未验证 提交 ced0c780 编写于 作者: 邱鹿 Lucas 提交者: GitHub

Optimize data consistency checker. (#7570)

* Optimize data consistency checker.

* Update javadoc.

* rename `result` and replace `AllArgsConstructor`

Co-authored-by: qiulu3 <Lucas209910>
上级 a0d6901a
......@@ -34,6 +34,7 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.ScalingJobController;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
......@@ -42,6 +43,7 @@ import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
import org.apache.shardingsphere.scaling.utils.ResponseContentUtil;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
......@@ -122,8 +124,8 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
private void checkJob(final ChannelHandlerContext context, final String requestPath) {
int jobId = Integer.parseInt(requestPath.split("/")[4]);
try {
boolean success = SCALING_JOB_CONTROLLER.check(jobId);
response(GSON.toJson(ResponseContentUtil.build(success)), context, HttpResponseStatus.OK);
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResultMap = SCALING_JOB_CONTROLLER.check(jobId);
response(GSON.toJson(ResponseContentUtil.build(dataConsistencyCheckResultMap)), context, HttpResponseStatus.OK);
} catch (final ScalingJobNotFoundException ex) {
response(GSON.toJson(ResponseContentUtil.handleBadRequest(ex.getMessage())), context, HttpResponseStatus.BAD_REQUEST);
}
......
......@@ -18,21 +18,25 @@
package org.apache.shardingsphere.scaling.fixture;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import java.util.Collections;
import java.util.Map;
@RequiredArgsConstructor
public final class FixtureDataConsistencyChecker implements DataConsistencyChecker {
private final ShardingScalingJob shardingScalingJob;
@Override
public boolean countCheck() {
return false;
public Map<String, DataConsistencyCheckResult> countCheck() {
return Collections.emptyMap();
}
@Override
public boolean dataCheck() {
return false;
public Map<String, Boolean> dataCheck() {
return Collections.emptyMap();
}
}
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.ScalingJobProgress;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
......@@ -28,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -91,17 +93,22 @@ public final class ScalingJobController {
}
/**
* Execute data consistency check by strict model.
* Execute data consistency check.
*
* @param shardingScalingJobId sharding scaling job id
* @return check success or not
* @return check result
*/
public boolean check(final int shardingScalingJobId) {
public Map<String, DataConsistencyCheckResult> check(final int shardingScalingJobId) {
if (!scalingJobMap.containsKey(shardingScalingJobId)) {
throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", shardingScalingJobId));
}
DataConsistencyChecker dataConsistencyChecker = scalingJobMap.get(shardingScalingJobId).getDataConsistencyChecker();
return dataConsistencyChecker.countCheck() && dataConsistencyChecker.dataCheck();
Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
result.forEach((key, value) -> value.setDataValid(dataCheckResult.getOrDefault(key, false)));
}
return result;
}
/**
......
......@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
import org.apache.shardingsphere.scaling.core.exception.DataCheckFailException;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
......@@ -33,11 +34,14 @@ import org.apache.shardingsphere.scaling.core.utils.ConfigurationYamlConverter;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Abstract data consistency checker.
......@@ -50,44 +54,48 @@ public abstract class AbstractDataConsistencyChecker implements DataConsistencyC
private final ShardingScalingJob shardingScalingJob;
@Override
public boolean countCheck() {
return shardingScalingJob.getSyncConfigurations().stream().allMatch(each -> each.getDumperConfiguration().getTableNameMap().values().stream().distinct().allMatch(this::countCheck));
public Map<String, DataConsistencyCheckResult> countCheck() {
return shardingScalingJob.getSyncConfigurations()
.stream().flatMap(each -> each.getDumperConfiguration().getTableNameMap().values().stream()).collect(Collectors.toSet())
.stream().collect(Collectors.toMap(Function.identity(), this::countCheck));
}
private boolean countCheck(final String table) {
long sourceCount = count(getSourceDataSource(), table);
long destinationCount = count(getDestinationDataSource(), table);
if (sourceCount == destinationCount) {
return true;
private DataConsistencyCheckResult countCheck(final String table) {
try (DataSourceWrapper sourceDataSource = getSourceDataSource();
DataSourceWrapper destinationDataSource = getDestinationDataSource()) {
long sourceCount = count(sourceDataSource, table);
long destinationCount = count(destinationDataSource, table);
return new DataConsistencyCheckResult(sourceCount, destinationCount);
} catch (IOException ex) {
throw new DataCheckFailException(String.format("table %s count check failed.", table), ex);
}
throw new DataCheckFailException(String.format("table %s count %d -> %d, check failed.", table, sourceCount, destinationCount));
}
protected DataSource getSourceDataSource() {
private long count(final DataSource dataSource, final String table) {
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(getSqlBuilder().buildCountSQL(table));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
} catch (SQLException ex) {
throw new DataCheckFailException(String.format("table %s count failed.", table), ex);
}
}
protected DataSourceWrapper getSourceDataSource() {
try {
Map<String, DataSource> dataSourceMap = DataSourceConverter.getDataSourceMap(
ConfigurationYamlConverter.loadDataSourceConfigurations(shardingScalingJob.getScalingConfiguration().getRuleConfiguration().getSourceDatasource()));
ShardingRuleConfiguration ruleConfiguration = ConfigurationYamlConverter.loadShardingRuleConfiguration(shardingScalingJob.getScalingConfiguration().getRuleConfiguration().getSourceRule());
return ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, Lists.newArrayList(ruleConfiguration), null);
return new DataSourceWrapper(ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, Lists.newArrayList(ruleConfiguration), null));
} catch (SQLException ex) {
throw new DataCheckFailException("get source data source failed.", ex);
}
}
protected DataSource getDestinationDataSource() {
protected DataSourceWrapper getDestinationDataSource() {
RuleConfiguration.YamlDataSourceParameter parameter = shardingScalingJob.getScalingConfiguration().getRuleConfiguration().getDestinationDataSources();
return new DataSourceFactory().newInstance(new JDBCDataSourceConfiguration(parameter.getUrl(), parameter.getUsername(), parameter.getPassword()));
}
private long count(final DataSource dataSource, final String table) {
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(getSqlBuilder().buildCountSQL(table));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
} catch (SQLException ex) {
throw new DataCheckFailException(String.format("count %s failed.", table), ex);
}
return new DataSourceWrapper(new DataSourceFactory().newInstance(new JDBCDataSourceConfiguration(parameter.getUrl(), parameter.getUsername(), parameter.getPassword())));
}
protected abstract AbstractSqlBuilder getSqlBuilder();
......
......@@ -15,71 +15,29 @@
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.datasource;
package org.apache.shardingsphere.scaling.core.check;
import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
@AllArgsConstructor
public final class HikariDataSourceWrapper implements DataSourceWrapper {
private final HikariDataSource dataSource;
@Override
public void close() {
if (!dataSource.isClosed()) {
dataSource.close();
}
}
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
@Override
public Connection getConnection(final String username, final String password) throws SQLException {
return dataSource.getConnection(username, password);
}
@Override
public <T> T unwrap(final Class<T> iface) throws SQLException {
return dataSource.unwrap(iface);
}
@Override
public boolean isWrapperFor(final Class<?> iface) throws SQLException {
return dataSource.isWrapperFor(iface);
}
/**
* Data consistency check result.
*/
@Getter
@Setter
public final class DataConsistencyCheckResult {
@Override
public PrintWriter getLogWriter() throws SQLException {
return dataSource.getLogWriter();
}
private final long sourceCount;
@Override
public void setLogWriter(final PrintWriter out) throws SQLException {
dataSource.setLogWriter(out);
}
private final long destinationCount;
@Override
public void setLoginTimeout(final int seconds) throws SQLException {
dataSource.setLoginTimeout(seconds);
}
private final boolean countValid;
@Override
public int getLoginTimeout() throws SQLException {
return dataSource.getLoginTimeout();
}
private boolean dataValid;
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return dataSource.getParentLogger();
public DataConsistencyCheckResult(final long sourceCount, final long destinationCount) {
this.sourceCount = sourceCount;
this.destinationCount = destinationCount;
countValid = sourceCount == destinationCount;
}
}
......@@ -17,6 +17,8 @@
package org.apache.shardingsphere.scaling.core.check;
import java.util.Map;
/**
* Data consistency checker interface.
*/
......@@ -25,14 +27,14 @@ public interface DataConsistencyChecker {
/**
* Check each table count is valid.
*
* @return count is valid or not.
* @return count check result
*/
boolean countCheck();
Map<String, DataConsistencyCheckResult> countCheck();
/**
* Check each table data is valid.
*
* @return data is valid or not.
* @return data is valid or not
*/
boolean dataCheck();
Map<String, Boolean> dataCheck();
}
......@@ -44,6 +44,6 @@ public final class DataSourceFactory {
result.setJdbcUrl(dataSourceConfiguration.getJdbcUrl());
result.setUsername(dataSourceConfiguration.getUsername());
result.setPassword(dataSourceConfiguration.getPassword());
return new HikariDataSourceWrapper(result);
return new DataSourceWrapper(result);
}
}
......@@ -95,7 +95,7 @@ public final class DataSourceManager implements AutoCloseable {
try {
each.close();
} catch (final IOException ex) {
log.warn("An exception occurred while closing the data source", ex);
log.error("An exception occurred while closing the data source", ex);
}
}
cachedDataSources.clear();
......
......@@ -17,11 +17,84 @@
package org.apache.shardingsphere.scaling.core.datasource;
import lombok.RequiredArgsConstructor;
import javax.sql.DataSource;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
/**
* Data source wrapper is for abstract standard jdbc and sharding jdbc.
*/
public interface DataSourceWrapper extends DataSource, Closeable {
@RequiredArgsConstructor
public class DataSourceWrapper implements DataSource, AutoCloseable {
private final DataSource dataSource;
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
@Override
public Connection getConnection(final String username, final String password) throws SQLException {
return dataSource.getConnection(username, password);
}
@Override
public <T> T unwrap(final Class<T> iface) throws SQLException {
return dataSource.unwrap(iface);
}
@Override
public boolean isWrapperFor(final Class<?> iface) throws SQLException {
return dataSource.isWrapperFor(iface);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return dataSource.getLogWriter();
}
@Override
public void setLogWriter(final PrintWriter out) throws SQLException {
dataSource.setLogWriter(out);
}
@Override
public void setLoginTimeout(final int seconds) throws SQLException {
dataSource.setLoginTimeout(seconds);
}
@Override
public int getLoginTimeout() throws SQLException {
return dataSource.getLoginTimeout();
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return dataSource.getParentLogger();
}
@Override
public void close() throws IOException {
if (dataSource == null) {
return;
}
if (dataSource instanceof AutoCloseable) {
try {
((AutoCloseable) dataSource).close();
} catch (final IOException ex) {
throw ex;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
throw new IOException("data source close failed.", ex);
}
}
}
}
......@@ -118,12 +118,12 @@ public final class SyncConfigurationUtil {
private static Map<String, Map<String, String>> toDataSourceTableNameMap(final TableRule tableRule) {
Map<String, Map<String, String>> result = new HashMap<>();
for (Entry<String, Collection<String>> each : tableRule.getDatasourceToTablesMap().entrySet()) {
Map<String, String> tableNameMap = result.get(each.getKey());
for (Entry<String, Collection<String>> entry : tableRule.getDatasourceToTablesMap().entrySet()) {
Map<String, String> tableNameMap = result.get(entry.getKey());
if (null == tableNameMap) {
result.put(each.getKey(), toTableNameMap(tableRule.getLogicTable(), each.getValue()));
result.put(entry.getKey(), toTableNameMap(tableRule.getLogicTable(), entry.getValue()));
} else {
tableNameMap.putAll(toTableNameMap(tableRule.getLogicTable(), each.getValue()));
tableNameMap.putAll(toTableNameMap(tableRule.getLogicTable(), entry.getValue()));
}
}
return result;
......@@ -138,12 +138,12 @@ public final class SyncConfigurationUtil {
}
private static void mergeDataSourceTableNameMap(final Map<String, Map<String, String>> mergedResult, final Map<String, Map<String, String>> newDataSourceTableNameMap) {
for (Entry<String, Map<String, String>> each : newDataSourceTableNameMap.entrySet()) {
Map<String, String> tableNameMap = mergedResult.get(each.getKey());
for (Entry<String, Map<String, String>> entry : newDataSourceTableNameMap.entrySet()) {
Map<String, String> tableNameMap = mergedResult.get(entry.getKey());
if (null == tableNameMap) {
mergedResult.put(each.getKey(), each.getValue());
mergedResult.put(entry.getKey(), entry.getValue());
} else {
tableNameMap.putAll(each.getValue());
tableNameMap.putAll(entry.getValue());
}
}
}
......
......@@ -22,8 +22,6 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManagerTest;
import org.apache.shardingsphere.scaling.core.exception.DataCheckFailException;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
import org.junit.Before;
......@@ -35,7 +33,10 @@ import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class AbstractDataConsistencyCheckerTest {
......@@ -53,34 +54,27 @@ public final class AbstractDataConsistencyCheckerTest {
}
@Test
public void assertCountCheckSuccess() {
initTableData(shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration(), 1);
initTableData(shardingScalingJob.getSyncConfigurations().get(0).getImporterConfiguration().getDataSourceConfiguration(), 1);
assertTrue(dataConsistencyChecker.countCheck());
}
@Test(expected = DataCheckFailException.class)
public void assertCountCheckFailure() {
initTableData(shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration(), 1);
initTableData(shardingScalingJob.getSyncConfigurations().get(0).getImporterConfiguration().getDataSourceConfiguration(), 0);
dataConsistencyChecker.countCheck();
public void assertCountCheck() {
initTableData(shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration());
initTableData(shardingScalingJob.getSyncConfigurations().get(0).getImporterConfiguration().getDataSourceConfiguration());
Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.countCheck();
assertTrue(resultMap.get("t1").isCountValid());
assertThat(resultMap.get("t1").getSourceCount(), is(resultMap.get("t1").getDestinationCount()));
}
@SneakyThrows(SQLException.class)
private void initTableData(final DataSourceConfiguration dataSourceConfiguration, final int count) {
private void initTableData(final DataSourceConfiguration dataSourceConfiguration) {
DataSource dataSource = new DataSourceManager().getDataSource(dataSourceConfiguration);
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t1");
statement.execute("CREATE TABLE t1 (id INT PRIMARY KEY, user_id VARCHAR(12))");
for (int i = 0; i < count; i++) {
statement.execute("INSERT INTO t1 (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
statement.execute("INSERT INTO t1 (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
}
private void mockShardingScalingJob() {
InputStream fileInputStream = DataSourceManagerTest.class.getResourceAsStream("/config.json");
InputStream fileInputStream = AbstractDataConsistencyCheckerTest.class.getResourceAsStream("/config.json");
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream);
ScalingConfiguration scalingConfiguration = GSON.fromJson(inputStreamReader, ScalingConfiguration.class);
shardingScalingJob = new ShardingScalingJob(scalingConfiguration);
......
......@@ -59,7 +59,7 @@ public final class DataSourceManagerTest {
public void assertGetDataSource() {
DataSourceManager dataSourceManager = new DataSourceManager();
DataSource actual = dataSourceManager.getDataSource(syncConfigurations.get(0).getDumperConfiguration().getDataSourceConfiguration());
assertThat(actual, instanceOf(HikariDataSourceWrapper.class));
assertThat(actual, instanceOf(DataSourceWrapper.class));
}
@Test
......
......@@ -18,10 +18,14 @@
package org.apache.shardingsphere.scaling.core.fixture;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import java.util.Collections;
import java.util.Map;
public final class FixtureDataConsistencyChecker extends AbstractDataConsistencyChecker implements DataConsistencyChecker {
public FixtureDataConsistencyChecker(final ShardingScalingJob shardingScalingJob) {
......@@ -29,13 +33,13 @@ public final class FixtureDataConsistencyChecker extends AbstractDataConsistency
}
@Override
public boolean countCheck() {
public Map<String, DataConsistencyCheckResult> countCheck() {
return super.countCheck();
}
@Override
public boolean dataCheck() {
return false;
public Map<String, Boolean> dataCheck() {
return Collections.emptyMap();
}
@Override
......@@ -45,7 +49,7 @@ public final class FixtureDataConsistencyChecker extends AbstractDataConsistency
protected String getLeftIdentifierQuoteString() {
return "`";
}
@Override
protected String getRightIdentifierQuoteString() {
return "`";
......
......@@ -20,7 +20,6 @@ package org.apache.shardingsphere.scaling.core.util;
import com.google.gson.Gson;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManagerTest;
import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
import org.junit.Before;
import org.junit.Test;
......@@ -50,7 +49,7 @@ public final class SyncConfigurationUtilTest {
}
private void initConfig(final String configFile) {
InputStream fileInputStream = DataSourceManagerTest.class.getResourceAsStream(configFile);
InputStream fileInputStream = SyncConfigurationUtilTest.class.getResourceAsStream(configFile);
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream);
scalingConfiguration = GSON.fromJson(inputStreamReader, ScalingConfiguration.class);
}
......
......@@ -19,22 +19,23 @@ package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
import org.apache.shardingsphere.scaling.core.exception.DataCheckFailException;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* MySQL data consistency checker.
......@@ -46,42 +47,40 @@ public final class MySQLDataConsistencyChecker extends AbstractDataConsistencyCh
}
@Override
public boolean dataCheck() {
return getShardingScalingJob().getSyncConfigurations().stream().allMatch(each -> dataValid(each.getDumperConfiguration().getTableNameMap()));
public Map<String, Boolean> dataCheck() {
return distinctByValue(getShardingScalingJob().getSyncConfigurations()
.stream().flatMap(each -> each.getDumperConfiguration().getTableNameMap().entrySet().stream()).collect(Collectors.toMap(Entry::getKey, Entry::getValue, (u, v) -> u)))
.entrySet().stream().collect(Collectors.toMap(Entry::getValue, entry -> dataValid(entry.getKey(), entry.getValue())));
}
private boolean dataValid(final Map<String, String> tableNameMap) {
return distinctByValue(tableNameMap).entrySet().stream().allMatch(entry -> getColumns(entry.getKey()).stream().allMatch(columnValid(entry.getValue())));
private Map<String, String> distinctByValue(final Map<String, String> tableNameMap) {
Set<String> distinctSet = new HashSet<>();
return tableNameMap.entrySet().stream().filter(entry -> distinctSet.add(entry.getValue())).collect(Collectors.toMap(Entry::getKey, Entry::getValue, (u, v) -> u));
}
private Map<String, String> distinctByValue(final Map<String, String> tableNameMap) {
Map<String, String> result = new HashMap<>();
Set<String> set = new HashSet<>();
for (Entry<String, String> entry : tableNameMap.entrySet()) {
if (set.add(entry.getValue())) {
result.put(entry.getKey(), entry.getValue());
}
private boolean dataValid(final String actualTableName, final String logicTableName) {
try (DataSourceWrapper sourceDataSource = getSourceDataSource();
DataSourceWrapper destinationDataSource = getDestinationDataSource()) {
return getColumns(actualTableName).stream().allMatch(each -> sumCrc32(sourceDataSource, logicTableName, each) == sumCrc32(destinationDataSource, logicTableName, each));
} catch (IOException ex) {
throw new DataCheckFailException(String.format("table %s data check failed.", logicTableName), ex);
}
return result;
}
private List<String> getColumns(final String tableName) {
List<String> result = new ArrayList<>();
try (Connection connection = getSourceDataSource().getConnection();
try (DataSourceWrapper sourceDataSource = getSourceDataSource();
Connection connection = sourceDataSource.getConnection();
ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), null, tableName, "%")) {
while (resultSet.next()) {
result.add(resultSet.getString(4));
}
} catch (SQLException ex) {
} catch (SQLException | IOException ex) {
throw new DataCheckFailException("get columns failed.", ex);
}
return result;
}
private Predicate<? super String> columnValid(final String tableName) {
return each -> sumCrc32(getSourceDataSource(), tableName, each) == sumCrc32(getDestinationDataSource(), tableName, each);
}
private long sumCrc32(final DataSource dataSource, final String tableName, final String column) {
String sql = getSqlBuilder().buildSumCrc32SQL(tableName, column);
try (Connection connection = dataSource.getConnection();
......
......@@ -22,6 +22,9 @@ import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import java.util.Collections;
import java.util.Map;
/**
* PostgreSQL data consistency checker.
*/
......@@ -32,8 +35,8 @@ public final class PostgreSQLDataConsistencyChecker extends AbstractDataConsiste
}
@Override
public boolean dataCheck() {
return true;
public Map<String, Boolean> dataCheck() {
return Collections.emptyMap();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册