diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2DataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2DataSourceChecker.java
index 4a5718735478bd5de9ef24e787909cb702e0aa7f..2e181759ea42f04c53c01ad2ab3f9bb15d7f3fbc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2DataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2DataSourceChecker.java
@@ -31,4 +31,8 @@ public final class FixtureH2DataSourceChecker implements DataSourceChecker {
@Override
public void checkPrivilege(final Collection extends DataSource> dataSources) {
}
+
+ @Override
+ public void checkVariable(final Collection extends DataSource> dataSources) {
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 4f4ec1563f2d5c4eec5eea1f3c7307706101dd53..88704ff9fb0ce038adef608d0cb73da86de1a9ac 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -70,6 +70,7 @@ public final class ShardingScalingJobPreparer {
DataSourceChecker dataSourceChecker = DataSourceCheckerCheckerFactory.newInstanceDataSourceChecker(databaseType);
dataSourceChecker.checkConnection(dataSourceManager.getCachedDataSources().values());
dataSourceChecker.checkPrivilege(dataSourceManager.getSourceDatasources().values());
+ dataSourceChecker.checkVariable(dataSourceManager.getSourceDatasources().values());
}
private void splitInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/DataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/DataSourceChecker.java
index 1876d695ee95953dee58f7c7ed7640baac2f38f8..acf00e410c50f7f7295355d0378657476ab22a40 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/DataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/DataSourceChecker.java
@@ -38,4 +38,11 @@ public interface DataSourceChecker {
* @param dataSources datasource connections
*/
void checkPrivilege(Collection extends DataSource> dataSources);
+
+ /**
+ * Check datasource variables.
+ *
+ * @param dataSources datasource connections
+ */
+ void checkVariable(Collection extends DataSource> dataSources);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceCheckerTest.java
index 9048b07daffe179425202fd911132f40e2b3e5eb..878893b5030ebdc17a1a2332231cf0b8b0356ecb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceCheckerTest.java
@@ -53,6 +53,11 @@ public final class AbstractDataSourceCheckerTest {
public void checkPrivilege(final Collection extends DataSource> dataSources) {
}
+
+ @Override
+ public void checkVariable(final Collection extends DataSource> dataSources) {
+
+ }
};
dataSources = new ArrayList<>();
dataSources.add(dataSource);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-mysql/pom.xml
index 64f8369ed57a5ccca444492cca0dd9b5e83eb6e5..b5bef91bf3570a10b109146d0957b684ae5a7669 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/pom.xml
@@ -44,5 +44,10 @@
io.netty
netty-all
+
+ com.h2database
+ h2
+ test
+
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index 3a585a7206ca9c17b82af89c41fca36d81a316d4..f1fc7b8393e4fe171dc0ad881de0d5fca439a74a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -88,22 +88,13 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
- if (null == event) {
- sleep();
- continue;
+ if (null != event) {
+ handleEvent(channel, uri, event);
}
- handleEvent(channel, uri, event);
}
pushRecord(channel, new FinishedRecord(new NopLogPosition()));
}
- private void sleep() {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- }
-
private void handleEvent(final Channel channel, final JdbcUri uri, final AbstractBinlogEvent event) {
if (event instanceof WriteRowsEvent) {
handleWriteRowsEvent(channel, uri, (WriteRowsEvent) event);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
index 81912b91d0a766e4cd4b9808686e8ee7e1634b7a..8bac86f56718f0b3fbd5d8ad0e16259f513b44da 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
@@ -25,59 +25,83 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
/**
* Data source checker for MySQL.
*/
public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
- private static final String QUERY_SQL = "SELECT * FROM %s LIMIT 1";
+ private static final String SHOW_GRANTS_SQL = "SHOW GRANTS";
- private static final String SHOW_MASTER_STATUS_SQL = "SHOW MASTER STATUS";
+ private static final String[][] REQUIRED_PRIVILEGES = {{"ALL PRIVILEGES", "ON *.*"}, {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT", "ON *.*"}};
+
+ private static final String SHOW_VARIABLES_SQL = "SHOW VARIABLES LIKE '%s'";
+
+ private static final Map REQUIRED_VARIABLES = new HashMap(2);
+
+ static {
+ REQUIRED_VARIABLES.put("LOG_BIN", "ON");
+ REQUIRED_VARIABLES.put("BINLOG_FORMAT", "ROW");
+ }
@Override
public void checkPrivilege(final Collection extends DataSource> dataSources) {
for (DataSource each : dataSources) {
- checkPrivilege0(each);
+ checkPrivilege(each);
}
}
- private void checkPrivilege0(final DataSource dataSource) {
- try (Connection connection = dataSource.getConnection()) {
- String tableName = getFirstTableName(connection);
- checkQueuePrivilege(connection, tableName);
- checkBinlogPrivilege(connection);
+ private void checkPrivilege(final DataSource dataSource) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(SHOW_GRANTS_SQL);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ String privilege = resultSet.getString(1).toUpperCase();
+ if (matchPrivileges(privilege)) {
+ return;
+ }
+ }
} catch (SQLException e) {
- throw new PrepareFailedException("Datasources privileges check failed!");
+ throw new PrepareFailedException("Source datasource check privileges failed.");
}
+ throw new PrepareFailedException("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
}
- private String getFirstTableName(final Connection connection) throws SQLException {
- try (ResultSet tables = connection.getMetaData().getTables(connection.getCatalog(), null, "%", new String[]{"TABLE"})) {
- if (tables.next()) {
- return tables.getString(3);
- }
- throw new PrepareFailedException("No tables find in the source datasource.");
+ private boolean matchPrivileges(final String privilege) {
+ return Arrays.stream(REQUIRED_PRIVILEGES)
+ .anyMatch(requiredPrivileges -> Arrays.stream(requiredPrivileges).allMatch(required -> privilege.contains(required)));
+ }
+
+ @Override
+ public void checkVariable(final Collection extends DataSource> dataSources) {
+ for (DataSource each : dataSources) {
+ checkVariable(each);
}
}
- private void checkQueuePrivilege(final Connection connection, final String tableName) {
- try (PreparedStatement preparedStatement = connection.prepareStatement(String.format(QUERY_SQL, tableName))) {
- preparedStatement.executeQuery();
+ private void checkVariable(final DataSource dataSource) {
+ try (Connection connection = dataSource.getConnection()) {
+ for (Map.Entry entry : REQUIRED_VARIABLES.entrySet()) {
+ checkVariable(connection, entry);
+ }
} catch (SQLException e) {
- throw new PrepareFailedException("Source datasource is lack of query privileges.");
+ throw new PrepareFailedException("Source datasource check variables failed.");
}
}
- private void checkBinlogPrivilege(final Connection connection) {
- try (PreparedStatement preparedStatement = connection.prepareStatement(SHOW_MASTER_STATUS_SQL);
- ResultSet resultSet = preparedStatement.executeQuery()) {
- if (!resultSet.next()) {
- throw new PrepareFailedException("Source datasource do not open binlog.");
+ private void checkVariable(final Connection connection, final Map.Entry entry) throws SQLException {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(String.format(SHOW_VARIABLES_SQL, entry.getKey()));
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ String value = resultSet.getString(2);
+ if (!entry.getValue().equalsIgnoreCase(value)) {
+ throw new PrepareFailedException(String.format("Source datasource required %s = %s, now is %s", entry.getKey(), entry.getValue(), value));
}
- } catch (SQLException e) {
- throw new PrepareFailedException("Source datasource is lack of replication(binlog) privileges.");
}
}
+
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
index 6f12ab461dbfb61c8046e2cb3580f56e229884e5..d483e080e0d626a1c6358eea8d0bde816e67ca54 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
@@ -47,6 +47,7 @@ import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* MySQL Connector.
@@ -193,7 +194,11 @@ public final class MySQLClient {
* @return binlog event
*/
public synchronized AbstractBinlogEvent poll() {
- return blockingEventQueue.poll();
+ try {
+ return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ return null;
+ }
}
@SuppressWarnings("unchecked")
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
index 7e091c3b0e9adf5be9603c783dbd0662e9d4c401..cb2835a7070b043a1bd6c41a8c452d7163dba9b8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.mysql;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.junit.Before;
import org.junit.Test;
@@ -28,7 +27,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -37,6 +35,7 @@ import java.util.Collection;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -44,8 +43,6 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MySQLDataSourceCheckerTest {
- private static final String CATALOG = "test";
-
@Mock
private Connection connection;
@@ -62,74 +59,57 @@ public class MySQLDataSourceCheckerTest {
@Before
public void setUp() throws SQLException {
DataSource dataSource = mock(DataSource.class);
- mockConnection();
- mockResultSet();
when(dataSource.getConnection()).thenReturn(connection);
dataSources = new ArrayList<>();
dataSources.add(dataSource);
dataSourceChecker = new MySQLDataSourceChecker();
+ when(connection.prepareStatement(anyString())).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
}
-
- @SneakyThrows
- private void mockConnection() {
- DatabaseMetaData metaData = mock(DatabaseMetaData.class);
- when(connection.getMetaData()).thenReturn(metaData);
- when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
- when(connection.getCatalog()).thenReturn(CATALOG);
- when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenReturn(preparedStatement);
- when(connection.prepareStatement("SHOW MASTER STATUS")).thenReturn(preparedStatement);
- }
-
- @SneakyThrows
- private void mockResultSet() {
+
+ @Test
+ public void assertCheckPrivilegeWithParticularSuccess() throws SQLException {
when(resultSet.next()).thenReturn(true);
- when(resultSet.getString(3)).thenReturn("test");
+ when(resultSet.getString(1)).thenReturn("GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '%'@'%'");
+ dataSourceChecker.checkPrivilege(dataSources);
+ verify(preparedStatement, Mockito.times(1)).executeQuery();
}
-
+
@Test
- public void assertCheckPrivilegeSuccess() throws SQLException {
+ public void assertCheckPrivilegeWithAllSuccess() throws SQLException {
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(1)).thenReturn("GRANT ALL PRIVILEGES CLIENT ON *.* TO '%'@'%'");
dataSourceChecker.checkPrivilege(dataSources);
- verify(preparedStatement, Mockito.times(2)).executeQuery();
+ verify(preparedStatement, Mockito.times(1)).executeQuery();
}
@Test
- public void assertCheckPrivilegeWithGettingTableFailure() throws SQLException {
+ public void assertCheckPrivilegeFailure() throws SQLException {
when(resultSet.next()).thenReturn(false);
try {
dataSourceChecker.checkPrivilege(dataSources);
} catch (PrepareFailedException checkFailedEx) {
- assertThat(checkFailedEx.getMessage(), is("No tables find in the source datasource."));
+ assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges."));
}
}
@Test
- public void assertCheckPrivilegeWithNoQueryPrivilegeFailure() throws SQLException {
- when(preparedStatement.executeQuery()).thenThrow(new SQLException());
- try {
- dataSourceChecker.checkPrivilege(dataSources);
- } catch (PrepareFailedException checkFailedEx) {
- assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of query privileges."));
- }
+ public void assertCheckVariableSuccess() throws SQLException {
+ when(resultSet.next()).thenReturn(true, true);
+ when(resultSet.getString(2)).thenReturn("ON", "ROW");
+ dataSourceChecker.checkVariable(dataSources);
+ verify(preparedStatement, Mockito.times(2)).executeQuery();
}
@Test
- public void assertCheckPrivilegeWithNoReplicationPrivilegeFailure() throws SQLException {
- when(connection.prepareStatement("SHOW MASTER STATUS")).thenThrow(new SQLException());
+ public void assertCheckVariableFailure() throws SQLException {
+ when(resultSet.next()).thenReturn(true, true);
+ when(resultSet.getString(2)).thenReturn("OFF", "ROW");
try {
- dataSourceChecker.checkPrivilege(dataSources);
+ dataSourceChecker.checkVariable(dataSources);
} catch (PrepareFailedException checkFailedEx) {
- assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of replication(binlog) privileges."));
+ assertThat(checkFailedEx.getMessage(), is("Source datasource required LOG_BIN = ON, now is OFF"));
}
}
- @Test
- public void assertCheckPrivilegeWithNoBinlogFailure() throws SQLException {
- when(resultSet.next()).thenReturn(true, false);
- try {
- dataSourceChecker.checkPrivilege(dataSources);
- } catch (PrepareFailedException checkFailedEx) {
- assertThat(checkFailedEx.getMessage(), is("Source datasource do not open binlog."));
- }
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceChecker.java
index c336ca8079d56f48211bb6021e02ad2423f5e06e..377e6a8cd6f6e17de9e2ee363dd05acc5f43bb84 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceChecker.java
@@ -50,4 +50,9 @@ public final class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker
throw new PrepareFailedException("Datasources check failed!");
}
}
+
+ @Override
+ public void checkVariable(final Collection extends DataSource> dataSources) {
+
+ }
}