未验证 提交 fd6f5355 编写于 作者: L Lucas 提交者: GitHub

issue-5688: Update mysql datasource checker (#5705)

Co-authored-by: NLucas <qiulu3@jd.com>
上级 925b8de8
......@@ -31,4 +31,8 @@ public final class FixtureH2DataSourceChecker implements DataSourceChecker {
@Override
public void checkPrivilege(final Collection<? extends DataSource> dataSources) {
}
@Override
public void checkVariable(final Collection<? extends DataSource> dataSources) {
}
}
......@@ -70,6 +70,7 @@ public final class ShardingScalingJobPreparer {
DataSourceChecker dataSourceChecker = DataSourceCheckerCheckerFactory.newInstanceDataSourceChecker(databaseType);
dataSourceChecker.checkConnection(dataSourceManager.getCachedDataSources().values());
dataSourceChecker.checkPrivilege(dataSourceManager.getSourceDatasources().values());
dataSourceChecker.checkVariable(dataSourceManager.getSourceDatasources().values());
}
private void splitInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
......
......@@ -38,4 +38,11 @@ public interface DataSourceChecker {
* @param dataSources datasource connections
*/
void checkPrivilege(Collection<? extends DataSource> dataSources);
/**
* Check datasource variables.
*
* @param dataSources datasource connections
*/
void checkVariable(Collection<? extends DataSource> dataSources);
}
......@@ -53,6 +53,11 @@ public final class AbstractDataSourceCheckerTest {
public void checkPrivilege(final Collection<? extends DataSource> dataSources) {
}
@Override
public void checkVariable(final Collection<? extends DataSource> dataSources) {
}
};
dataSources = new ArrayList<>();
dataSources.add(dataSource);
......
......@@ -44,5 +44,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -88,22 +88,13 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
if (null == event) {
sleep();
continue;
if (null != event) {
handleEvent(channel, uri, event);
}
handleEvent(channel, uri, event);
}
pushRecord(channel, new FinishedRecord(new NopLogPosition()));
}
private void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
private void handleEvent(final Channel channel, final JdbcUri uri, final AbstractBinlogEvent event) {
if (event instanceof WriteRowsEvent) {
handleWriteRowsEvent(channel, uri, (WriteRowsEvent) event);
......
......@@ -25,59 +25,83 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Data source checker for MySQL.
*/
public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
private static final String QUERY_SQL = "SELECT * FROM %s LIMIT 1";
private static final String SHOW_GRANTS_SQL = "SHOW GRANTS";
private static final String SHOW_MASTER_STATUS_SQL = "SHOW MASTER STATUS";
private static final String[][] REQUIRED_PRIVILEGES = {{"ALL PRIVILEGES", "ON *.*"}, {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT", "ON *.*"}};
private static final String SHOW_VARIABLES_SQL = "SHOW VARIABLES LIKE '%s'";
private static final Map<String, String> REQUIRED_VARIABLES = new HashMap(2);
static {
REQUIRED_VARIABLES.put("LOG_BIN", "ON");
REQUIRED_VARIABLES.put("BINLOG_FORMAT", "ROW");
}
@Override
public void checkPrivilege(final Collection<? extends DataSource> dataSources) {
for (DataSource each : dataSources) {
checkPrivilege0(each);
checkPrivilege(each);
}
}
private void checkPrivilege0(final DataSource dataSource) {
try (Connection connection = dataSource.getConnection()) {
String tableName = getFirstTableName(connection);
checkQueuePrivilege(connection, tableName);
checkBinlogPrivilege(connection);
private void checkPrivilege(final DataSource dataSource) {
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(SHOW_GRANTS_SQL);
ResultSet resultSet = preparedStatement.executeQuery()) {
while (resultSet.next()) {
String privilege = resultSet.getString(1).toUpperCase();
if (matchPrivileges(privilege)) {
return;
}
}
} catch (SQLException e) {
throw new PrepareFailedException("Datasources privileges check failed!");
throw new PrepareFailedException("Source datasource check privileges failed.");
}
throw new PrepareFailedException("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
}
private String getFirstTableName(final Connection connection) throws SQLException {
try (ResultSet tables = connection.getMetaData().getTables(connection.getCatalog(), null, "%", new String[]{"TABLE"})) {
if (tables.next()) {
return tables.getString(3);
}
throw new PrepareFailedException("No tables find in the source datasource.");
private boolean matchPrivileges(final String privilege) {
return Arrays.stream(REQUIRED_PRIVILEGES)
.anyMatch(requiredPrivileges -> Arrays.stream(requiredPrivileges).allMatch(required -> privilege.contains(required)));
}
@Override
public void checkVariable(final Collection<? extends DataSource> dataSources) {
for (DataSource each : dataSources) {
checkVariable(each);
}
}
private void checkQueuePrivilege(final Connection connection, final String tableName) {
try (PreparedStatement preparedStatement = connection.prepareStatement(String.format(QUERY_SQL, tableName))) {
preparedStatement.executeQuery();
private void checkVariable(final DataSource dataSource) {
try (Connection connection = dataSource.getConnection()) {
for (Map.Entry<String, String> entry : REQUIRED_VARIABLES.entrySet()) {
checkVariable(connection, entry);
}
} catch (SQLException e) {
throw new PrepareFailedException("Source datasource is lack of query privileges.");
throw new PrepareFailedException("Source datasource check variables failed.");
}
}
private void checkBinlogPrivilege(final Connection connection) {
try (PreparedStatement preparedStatement = connection.prepareStatement(SHOW_MASTER_STATUS_SQL);
ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
throw new PrepareFailedException("Source datasource do not open binlog.");
private void checkVariable(final Connection connection, final Map.Entry<String, String> entry) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(String.format(SHOW_VARIABLES_SQL, entry.getKey()));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
String value = resultSet.getString(2);
if (!entry.getValue().equalsIgnoreCase(value)) {
throw new PrepareFailedException(String.format("Source datasource required %s = %s, now is %s", entry.getKey(), entry.getValue(), value));
}
} catch (SQLException e) {
throw new PrepareFailedException("Source datasource is lack of replication(binlog) privileges.");
}
}
}
......@@ -47,6 +47,7 @@ import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* MySQL Connector.
......@@ -193,7 +194,11 @@ public final class MySQLClient {
* @return binlog event
*/
public synchronized AbstractBinlogEvent poll() {
return blockingEventQueue.poll();
try {
return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
return null;
}
}
@SuppressWarnings("unchecked")
......
......@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.mysql;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.junit.Before;
import org.junit.Test;
......@@ -28,7 +27,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -37,6 +35,7 @@ import java.util.Collection;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -44,8 +43,6 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MySQLDataSourceCheckerTest {
private static final String CATALOG = "test";
@Mock
private Connection connection;
......@@ -62,74 +59,57 @@ public class MySQLDataSourceCheckerTest {
@Before
public void setUp() throws SQLException {
DataSource dataSource = mock(DataSource.class);
mockConnection();
mockResultSet();
when(dataSource.getConnection()).thenReturn(connection);
dataSources = new ArrayList<>();
dataSources.add(dataSource);
dataSourceChecker = new MySQLDataSourceChecker();
when(connection.prepareStatement(anyString())).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
}
@SneakyThrows
private void mockConnection() {
DatabaseMetaData metaData = mock(DatabaseMetaData.class);
when(connection.getMetaData()).thenReturn(metaData);
when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
when(connection.getCatalog()).thenReturn(CATALOG);
when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenReturn(preparedStatement);
when(connection.prepareStatement("SHOW MASTER STATUS")).thenReturn(preparedStatement);
}
@SneakyThrows
private void mockResultSet() {
@Test
public void assertCheckPrivilegeWithParticularSuccess() throws SQLException {
when(resultSet.next()).thenReturn(true);
when(resultSet.getString(3)).thenReturn("test");
when(resultSet.getString(1)).thenReturn("GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '%'@'%'");
dataSourceChecker.checkPrivilege(dataSources);
verify(preparedStatement, Mockito.times(1)).executeQuery();
}
@Test
public void assertCheckPrivilegeSuccess() throws SQLException {
public void assertCheckPrivilegeWithAllSuccess() throws SQLException {
when(resultSet.next()).thenReturn(true);
when(resultSet.getString(1)).thenReturn("GRANT ALL PRIVILEGES CLIENT ON *.* TO '%'@'%'");
dataSourceChecker.checkPrivilege(dataSources);
verify(preparedStatement, Mockito.times(2)).executeQuery();
verify(preparedStatement, Mockito.times(1)).executeQuery();
}
@Test
public void assertCheckPrivilegeWithGettingTableFailure() throws SQLException {
public void assertCheckPrivilegeFailure() throws SQLException {
when(resultSet.next()).thenReturn(false);
try {
dataSourceChecker.checkPrivilege(dataSources);
} catch (PrepareFailedException checkFailedEx) {
assertThat(checkFailedEx.getMessage(), is("No tables find in the source datasource."));
assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges."));
}
}
@Test
public void assertCheckPrivilegeWithNoQueryPrivilegeFailure() throws SQLException {
when(preparedStatement.executeQuery()).thenThrow(new SQLException());
try {
dataSourceChecker.checkPrivilege(dataSources);
} catch (PrepareFailedException checkFailedEx) {
assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of query privileges."));
}
public void assertCheckVariableSuccess() throws SQLException {
when(resultSet.next()).thenReturn(true, true);
when(resultSet.getString(2)).thenReturn("ON", "ROW");
dataSourceChecker.checkVariable(dataSources);
verify(preparedStatement, Mockito.times(2)).executeQuery();
}
@Test
public void assertCheckPrivilegeWithNoReplicationPrivilegeFailure() throws SQLException {
when(connection.prepareStatement("SHOW MASTER STATUS")).thenThrow(new SQLException());
public void assertCheckVariableFailure() throws SQLException {
when(resultSet.next()).thenReturn(true, true);
when(resultSet.getString(2)).thenReturn("OFF", "ROW");
try {
dataSourceChecker.checkPrivilege(dataSources);
dataSourceChecker.checkVariable(dataSources);
} catch (PrepareFailedException checkFailedEx) {
assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of replication(binlog) privileges."));
assertThat(checkFailedEx.getMessage(), is("Source datasource required LOG_BIN = ON, now is OFF"));
}
}
@Test
public void assertCheckPrivilegeWithNoBinlogFailure() throws SQLException {
when(resultSet.next()).thenReturn(true, false);
try {
dataSourceChecker.checkPrivilege(dataSources);
} catch (PrepareFailedException checkFailedEx) {
assertThat(checkFailedEx.getMessage(), is("Source datasource do not open binlog."));
}
}
}
......@@ -50,4 +50,9 @@ public final class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker
throw new PrepareFailedException("Datasources check failed!");
}
}
@Override
public void checkVariable(final Collection<? extends DataSource> dataSources) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册